diff --git a/datanode/candlesv2/candle_updates.go b/datanode/candlesv2/candle_updates.go index 10414f9ea0..7bacfe29ba 100644 --- a/datanode/candlesv2/candle_updates.go +++ b/datanode/candlesv2/candle_updates.go @@ -17,7 +17,9 @@ package candlesv2 import ( "context" + "errors" "fmt" + "sync" "sync/atomic" "time" @@ -25,6 +27,8 @@ import ( "code.vegaprotocol.io/vega/logging" ) +var ErrNewSubscriberNotReady = errors.New("new subscriber was not ready to receive the last candle data") + type candleSource interface { GetCandleDataForTimeSpan(ctx context.Context, candleID string, from *time.Time, to *time.Time, p entities.CursorPagination) ([]entities.Candle, entities.PageInfo, error) @@ -45,23 +49,28 @@ func (m subscriptionMsg) String() string { } type CandleUpdates struct { - log *logging.Logger - candleSource candleSource - candleID string - subscriptionMsgChan chan subscriptionMsg - nextSubscriptionID atomic.Uint64 - config CandleUpdatesConfig + log *logging.Logger + candleSource candleSource + candleID string + subscriptionMsgCh chan subscriptionMsg + nextSubscriptionID atomic.Uint64 + config CandleUpdatesConfig + subs map[string]chan entities.Candle + mu *sync.RWMutex + lastCandle *entities.Candle } func NewCandleUpdates(ctx context.Context, log *logging.Logger, candleID string, candleSource candleSource, config CandleUpdatesConfig, ) *CandleUpdates { ces := &CandleUpdates{ - log: log, - candleSource: candleSource, - candleID: candleID, - config: config, - subscriptionMsgChan: make(chan subscriptionMsg, config.CandleUpdatesStreamSubscriptionMsgBufferSize), + log: log, + candleSource: candleSource, + candleID: candleID, + config: config, + subscriptionMsgCh: make(chan subscriptionMsg, config.CandleUpdatesStreamSubscriptionMsgBufferSize), + subs: map[string]chan entities.Candle{}, + mu: &sync.RWMutex{}, } go ces.run(ctx) @@ -70,77 +79,88 @@ func NewCandleUpdates(ctx context.Context, log *logging.Logger, candleID string, } func (s *CandleUpdates) run(ctx context.Context) { - subscriptions := map[string]chan entities.Candle{} - defer closeAllSubscriptions(subscriptions) + defer s.closeAllSubscriptions() ticker := time.NewTicker(s.config.CandleUpdatesStreamInterval.Duration) defer ticker.Stop() - var lastCandle *entities.Candle - errorGettingCandleUpdates := false + candleUpdatesFailed := false + updateCandles := func(now time.Time) *entities.Candle { + // no subscriptions, don't update candles and remove last candle. + if len(s.subs) == 0 { + return nil + } + candles, err := s.getCandleUpdates(ctx, now) + if err != nil { + if !candleUpdatesFailed { + s.log.Error("Failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err)) + } + candleUpdatesFailed = true + return s.lastCandle // keep last candle we successfully obtained + } + if candleUpdatesFailed { + s.log.Info("Successfully got candles for candle id", logging.String("candle", s.candleID)) + candleUpdatesFailed = false + } + if len(candles) == 0 { + return s.lastCandle // no new data, just keep the reference to the last candle we had + } + // send the new data to all subscribers. + _ = s.sendCandlesToSubscribers(candles, s.subs) + return &candles[len(candles)-1] // update last candle reference + } for { select { case <-ctx.Done(): return - case subscriptionMsg := <-s.subscriptionMsgChan: - subscriptions = s.handleSubscription(subscriptions, subscriptionMsg, lastCandle) + case subscriptionMsg := <-s.subscriptionMsgCh: + s.mu.Lock() + s.handleSubscription(subscriptionMsg) + s.mu.Unlock() case now := <-ticker.C: - if len(subscriptions) == 0 { - lastCandle = nil - continue - } - candles, err := s.getCandleUpdates(ctx, lastCandle, now) - if err != nil { - if !errorGettingCandleUpdates { - s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err)) - } - errorGettingCandleUpdates = true - continue - } - if errorGettingCandleUpdates { - s.log.Infof("Successfully got candles for candle", logging.String("candle", s.candleID)) - errorGettingCandleUpdates = false - } - if len(candles) > 0 { - lastCandle = &candles[len(candles)-1] - } - subscriptions = s.sendCandlesToSubscribers(candles, subscriptions) + s.mu.RLock() + s.lastCandle = updateCandles(now) + s.mu.RUnlock() } } } -func (s *CandleUpdates) handleSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle { +func (s *CandleUpdates) handleSubscription(subscription subscriptionMsg) { if subscription.subscribe { - return s.addSubscription(subscriptions, subscription, lastCandle) + s.addSubscription(subscription) + return } - return removeSubscription(subscriptions, subscription.id) + s.removeSubscription(subscription.id) } -func (s *CandleUpdates) addSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle { - if lastCandle != nil { - if rm := s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out}); len(rm) == 0 { - // try to send the last candle data to the new subscription, if it fails, don't update the map - return subscriptions - } +func (s *CandleUpdates) addSubscription(subscription subscriptionMsg) { + if s.lastCandle == nil { + s.subs[subscription.id] = subscription.out + return + } + newSub := map[string]chan entities.Candle{ + subscription.id: subscription.out, + } + if rm := s.sendCandlesToSubscribers([]entities.Candle{*s.lastCandle}, newSub); len(rm) == 0 { + s.subs[subscription.id] = subscription.out } - subscriptions[subscription.id] = subscription.out - return subscriptions } -func removeSubscription(subscriptions map[string]chan entities.Candle, subscriptionID string) map[string]chan entities.Candle { - if ch, ok := subscriptions[subscriptionID]; ok { - // first delete - delete(subscriptions, subscriptionID) - // then close +func (s *CandleUpdates) removeSubscription(id string) { + // no lock acquired, the map HAS to be locked when this function is called. + if ch, ok := s.subs[id]; ok { close(ch) + delete(s.subs, id) } - return subscriptions } -func closeAllSubscriptions(subscribers map[string]chan entities.Candle) { - for _, subscriber := range subscribers { +func (s *CandleUpdates) closeAllSubscriptions() { + s.mu.Lock() + s.lastCandle = nil + for _, subscriber := range s.subs { close(subscriber) } + s.mu.Unlock() } // Subscribe returns a unique subscription id and channel on which updates will be sent. @@ -148,52 +168,78 @@ func (s *CandleUpdates) Subscribe() (string, <-chan entities.Candle, error) { out := make(chan entities.Candle, s.config.CandleUpdatesStreamBufferSize) nextID := s.nextSubscriptionID.Add(1) - subscriptionID := fmt.Sprintf("%s-%d", s.candleID, nextID) + id := fmt.Sprintf("%s-%d", s.candleID, nextID) + var err error + if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 { + // immediately add, acquire the lock and add to the map. + s.mu.Lock() + defer s.mu.Unlock() + s.subs[id] = out + // we have some data to send, then try this immediately + if s.lastCandle != nil { + newSub := map[string]chan entities.Candle{ + id: out, + } + // try to send the last candle to the new subscriber, this will remove the last sub + // and close the channel if the send fails. + if rm := s.sendCandlesToSubscribers([]entities.Candle{*s.lastCandle}, newSub); len(rm) != 0 { + // if rm is not empty, the new subscriber was removed, and the channel was closed. + return "", nil, ErrNewSubscriberNotReady + } + } + return id, out, nil + } msg := subscriptionMsg{ subscribe: true, - id: subscriptionID, + id: id, out: out, } - err := s.sendSubscriptionMessage(msg) + err = s.sendSubscriptionMessage(msg) if err != nil { return "", nil, err } - return subscriptionID, out, nil + return id, out, nil } -func (s *CandleUpdates) Unsubscribe(subscriptionID string) error { +func (s *CandleUpdates) Unsubscribe(id string) error { + if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 { + // instantly unsubscribe, acquire the lock and remove from the map + s.mu.Lock() + if ch, ok := s.subs[id]; ok { + close(ch) + delete(s.subs, id) + } + s.mu.Unlock() + return nil + } msg := subscriptionMsg{ subscribe: false, - id: subscriptionID, + id: id, } return s.sendSubscriptionMessage(msg) } func (s *CandleUpdates) sendSubscriptionMessage(msg subscriptionMsg) error { - if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 { - s.subscriptionMsgChan <- msg - } else { - select { - case s.subscriptionMsgChan <- msg: - default: - return fmt.Errorf("failed to send subscription message \"%s\", subscription message buffer is full, try again later", msg) - } + select { + case s.subscriptionMsgCh <- msg: + return nil + default: + return fmt.Errorf("failed to send subscription message \"%s\", subscription message buffer is full, try again later", msg) } - return nil } -func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle, now time.Time) ([]entities.Candle, error) { +func (s *CandleUpdates) getCandleUpdates(ctx context.Context, now time.Time) ([]entities.Candle, error) { ctx, cancelFn := context.WithTimeout(ctx, s.config.CandlesFetchTimeout.Duration) defer cancelFn() var updates []entities.Candle var err error - if lastCandle != nil { - start := lastCandle.PeriodStart + if s.lastCandle != nil { + start := s.lastCandle.PeriodStart var candles []entities.Candle candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, &now, entities.CursorPagination{}) @@ -205,33 +251,34 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti updates = make([]entities.Candle, 0, len(candles)) for _, candle := range candles { // not before so either newer, or the same (last) candle should be returned. - if !candle.LastUpdateInPeriod.Before(lastCandle.LastUpdateInPeriod) || !candle.PeriodStart.Before(lastCandle.PeriodStart) { + if !candle.LastUpdateInPeriod.Before(s.lastCandle.LastUpdateInPeriod) || !candle.PeriodStart.Before(s.lastCandle.PeriodStart) { updates = append(updates, candle) } } - } else { - updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, entities.CursorPagination{}) + return updates, nil + } + updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, entities.CursorPagination{}) - if err != nil { - return nil, fmt.Errorf("getting candle updates:%w", err) - } + if err != nil { + return nil, fmt.Errorf("getting candle updates:%w", err) } return updates, nil } -func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) map[string]chan entities.Candle { - ret := subscriptions - for subscriptionID, outCh := range subscriptions { +func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) []string { + rm := make([]string, 0, len(subscriptions)) + for id, outCh := range subscriptions { loop: for _, candle := range candles { select { case outCh <- candle: default: - ret = removeSubscription(ret, subscriptionID) + rm = append(rm, id) + s.removeSubscription(id) break loop } } } - return ret + return rm }