Skip to content

Commit

Permalink
fix: rework candle service, use mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Elias Van Ootegem <[email protected]>
  • Loading branch information
EVODelavega committed Oct 7, 2024
1 parent ba1d73a commit 1344c3e
Showing 1 changed file with 132 additions and 85 deletions.
217 changes: 132 additions & 85 deletions datanode/candlesv2/candle_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ package candlesv2

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"code.vegaprotocol.io/vega/datanode/entities"
"code.vegaprotocol.io/vega/logging"
)

var ErrNewSubscriberNotReady = errors.New("new subscriber was not ready to receive the last candle data")

type candleSource interface {
GetCandleDataForTimeSpan(ctx context.Context, candleID string, from *time.Time, to *time.Time,
p entities.CursorPagination) ([]entities.Candle, entities.PageInfo, error)
Expand All @@ -45,23 +49,28 @@ func (m subscriptionMsg) String() string {
}

type CandleUpdates struct {
log *logging.Logger
candleSource candleSource
candleID string
subscriptionMsgChan chan subscriptionMsg
nextSubscriptionID atomic.Uint64
config CandleUpdatesConfig
log *logging.Logger
candleSource candleSource
candleID string
subscriptionMsgCh chan subscriptionMsg
nextSubscriptionID atomic.Uint64
config CandleUpdatesConfig
subs map[string]chan entities.Candle
mu *sync.RWMutex
lastCandle *entities.Candle
}

func NewCandleUpdates(ctx context.Context, log *logging.Logger, candleID string, candleSource candleSource,
config CandleUpdatesConfig,
) *CandleUpdates {
ces := &CandleUpdates{
log: log,
candleSource: candleSource,
candleID: candleID,
config: config,
subscriptionMsgChan: make(chan subscriptionMsg, config.CandleUpdatesStreamSubscriptionMsgBufferSize),
log: log,
candleSource: candleSource,
candleID: candleID,
config: config,
subscriptionMsgCh: make(chan subscriptionMsg, config.CandleUpdatesStreamSubscriptionMsgBufferSize),
subs: map[string]chan entities.Candle{},
mu: &sync.RWMutex{},
}

go ces.run(ctx)
Expand All @@ -70,130 +79,167 @@ func NewCandleUpdates(ctx context.Context, log *logging.Logger, candleID string,
}

func (s *CandleUpdates) run(ctx context.Context) {
subscriptions := map[string]chan entities.Candle{}
defer closeAllSubscriptions(subscriptions)
defer s.closeAllSubscriptions()

ticker := time.NewTicker(s.config.CandleUpdatesStreamInterval.Duration)
defer ticker.Stop()
var lastCandle *entities.Candle

errorGettingCandleUpdates := false
candleUpdatesFailed := false
updateCandles := func(now time.Time) *entities.Candle {
// no subscriptions, don't update candles and remove last candle.
if len(s.subs) == 0 {
return nil
}
candles, err := s.getCandleUpdates(ctx, now)
if err != nil {
if !candleUpdatesFailed {
s.log.Error("Failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err))
}
candleUpdatesFailed = true
return s.lastCandle // keep last candle we successfully obtained
}
if candleUpdatesFailed {
s.log.Info("Successfully got candles for candle id", logging.String("candle", s.candleID))
candleUpdatesFailed = false
}
if len(candles) == 0 {
return s.lastCandle // no new data, just keep the reference to the last candle we had
}
// send the new data to all subscribers.
_ = s.sendCandlesToSubscribers(candles, s.subs)
return &candles[len(candles)-1] // update last candle reference
}
for {
select {
case <-ctx.Done():
return
case subscriptionMsg := <-s.subscriptionMsgChan:
subscriptions = s.handleSubscription(subscriptions, subscriptionMsg, lastCandle)
case subscriptionMsg := <-s.subscriptionMsgCh:
s.mu.Lock()
s.handleSubscription(subscriptionMsg)
s.mu.Unlock()
case now := <-ticker.C:
if len(subscriptions) == 0 {
lastCandle = nil
continue
}
candles, err := s.getCandleUpdates(ctx, lastCandle, now)
if err != nil {
if !errorGettingCandleUpdates {
s.log.Errorf("failed to get candles for candle id", logging.String("candle", s.candleID), logging.Error(err))
}
errorGettingCandleUpdates = true
continue
}
if errorGettingCandleUpdates {
s.log.Infof("Successfully got candles for candle", logging.String("candle", s.candleID))
errorGettingCandleUpdates = false
}
if len(candles) > 0 {
lastCandle = &candles[len(candles)-1]
}
subscriptions = s.sendCandlesToSubscribers(candles, subscriptions)
s.mu.RLock()
s.lastCandle = updateCandles(now)
s.mu.RUnlock()
}
}
}

func (s *CandleUpdates) handleSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle {
func (s *CandleUpdates) handleSubscription(subscription subscriptionMsg) {
if subscription.subscribe {
return s.addSubscription(subscriptions, subscription, lastCandle)
s.addSubscription(subscription)
return
}
return removeSubscription(subscriptions, subscription.id)
s.removeSubscription(subscription.id)
}

func (s *CandleUpdates) addSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) map[string]chan entities.Candle {
if lastCandle != nil {
if rm := s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out}); len(rm) == 0 {
// try to send the last candle data to the new subscription, if it fails, don't update the map
return subscriptions
}
func (s *CandleUpdates) addSubscription(subscription subscriptionMsg) {
if s.lastCandle == nil {
s.subs[subscription.id] = subscription.out
return
}
newSub := map[string]chan entities.Candle{
subscription.id: subscription.out,
}
if rm := s.sendCandlesToSubscribers([]entities.Candle{*s.lastCandle}, newSub); len(rm) == 0 {
s.subs[subscription.id] = subscription.out
}
subscriptions[subscription.id] = subscription.out
return subscriptions
}

func removeSubscription(subscriptions map[string]chan entities.Candle, subscriptionID string) map[string]chan entities.Candle {
if ch, ok := subscriptions[subscriptionID]; ok {
// first delete
delete(subscriptions, subscriptionID)
// then close
func (s *CandleUpdates) removeSubscription(id string) {
// no lock acquired, the map HAS to be locked when this function is called.
if ch, ok := s.subs[id]; ok {
close(ch)
delete(s.subs, id)
}
return subscriptions
}

func closeAllSubscriptions(subscribers map[string]chan entities.Candle) {
for _, subscriber := range subscribers {
func (s *CandleUpdates) closeAllSubscriptions() {
s.mu.Lock()
s.lastCandle = nil
for _, subscriber := range s.subs {
close(subscriber)
}
s.mu.Unlock()
}

// Subscribe returns a unique subscription id and channel on which updates will be sent.
func (s *CandleUpdates) Subscribe() (string, <-chan entities.Candle, error) {
out := make(chan entities.Candle, s.config.CandleUpdatesStreamBufferSize)

nextID := s.nextSubscriptionID.Add(1)
subscriptionID := fmt.Sprintf("%s-%d", s.candleID, nextID)
id := fmt.Sprintf("%s-%d", s.candleID, nextID)
var err error

if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 {
// immediately add, acquire the lock and add to the map.
s.mu.Lock()
defer s.mu.Unlock()
s.subs[id] = out
// we have some data to send, then try this immediately
if s.lastCandle != nil {
newSub := map[string]chan entities.Candle{
id: out,
}
// try to send the last candle to the new subscriber, this will remove the last sub
// and close the channel if the send fails.
if rm := s.sendCandlesToSubscribers([]entities.Candle{*s.lastCandle}, newSub); len(rm) != 0 {
// if rm is not empty, the new subscriber was removed, and the channel was closed.
return "", nil, ErrNewSubscriberNotReady
}
}
return id, out, nil
}
msg := subscriptionMsg{
subscribe: true,
id: subscriptionID,
id: id,
out: out,
}

err := s.sendSubscriptionMessage(msg)
err = s.sendSubscriptionMessage(msg)
if err != nil {
return "", nil, err
}

return subscriptionID, out, nil
return id, out, nil
}

func (s *CandleUpdates) Unsubscribe(subscriptionID string) error {
func (s *CandleUpdates) Unsubscribe(id string) error {
if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 {
// instantly unsubscribe, acquire the lock and remove from the map
s.mu.Lock()
if ch, ok := s.subs[id]; ok {
close(ch)
delete(s.subs, id)
}
s.mu.Unlock()
return nil
}
msg := subscriptionMsg{
subscribe: false,
id: subscriptionID,
id: id,
}

return s.sendSubscriptionMessage(msg)
}

func (s *CandleUpdates) sendSubscriptionMessage(msg subscriptionMsg) error {
if s.config.CandleUpdatesStreamSubscriptionMsgBufferSize == 0 {
s.subscriptionMsgChan <- msg
} else {
select {
case s.subscriptionMsgChan <- msg:
default:
return fmt.Errorf("failed to send subscription message \"%s\", subscription message buffer is full, try again later", msg)
}
select {
case s.subscriptionMsgCh <- msg:
return nil
default:
return fmt.Errorf("failed to send subscription message \"%s\", subscription message buffer is full, try again later", msg)
}
return nil
}

func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entities.Candle, now time.Time) ([]entities.Candle, error) {
func (s *CandleUpdates) getCandleUpdates(ctx context.Context, now time.Time) ([]entities.Candle, error) {
ctx, cancelFn := context.WithTimeout(ctx, s.config.CandlesFetchTimeout.Duration)
defer cancelFn()

var updates []entities.Candle
var err error
if lastCandle != nil {
start := lastCandle.PeriodStart
if s.lastCandle != nil {
start := s.lastCandle.PeriodStart
var candles []entities.Candle
candles, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, &start, &now, entities.CursorPagination{})

Expand All @@ -205,33 +251,34 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
updates = make([]entities.Candle, 0, len(candles))
for _, candle := range candles {
// not before so either newer, or the same (last) candle should be returned.
if !candle.LastUpdateInPeriod.Before(lastCandle.LastUpdateInPeriod) || !candle.PeriodStart.Before(lastCandle.PeriodStart) {
if !candle.LastUpdateInPeriod.Before(s.lastCandle.LastUpdateInPeriod) || !candle.PeriodStart.Before(s.lastCandle.PeriodStart) {
updates = append(updates, candle)
}
}
} else {
updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, entities.CursorPagination{})
return updates, nil
}
updates, _, err = s.candleSource.GetCandleDataForTimeSpan(ctx, s.candleID, nil, &now, entities.CursorPagination{})

if err != nil {
return nil, fmt.Errorf("getting candle updates:%w", err)
}
if err != nil {
return nil, fmt.Errorf("getting candle updates:%w", err)
}

return updates, nil
}

func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) map[string]chan entities.Candle {
ret := subscriptions
for subscriptionID, outCh := range subscriptions {
func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) []string {
rm := make([]string, 0, len(subscriptions))
for id, outCh := range subscriptions {
loop:
for _, candle := range candles {
select {
case outCh <- candle:
default:
ret = removeSubscription(ret, subscriptionID)
rm = append(rm, id)
s.removeSubscription(id)
break loop
}
}
}
return ret
return rm
}

0 comments on commit 1344c3e

Please sign in to comment.