Skip to content

Commit

Permalink
fix: remove subscription from main map when channel is closed on firs…
Browse files Browse the repository at this point in the history
…t send

Signed-off-by: Elias Van Ootegem <[email protected]>
  • Loading branch information
EVODelavega committed Sep 25, 2024
1 parent 79ae7ba commit c1d3159
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions datanode/candlesv2/candle_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ func (s *CandleUpdates) handleSubscription(subscriptions map[string]chan entitie
func (s *CandleUpdates) addSubscription(subscriptions map[string]chan entities.Candle, subscription subscriptionMsg, lastCandle *entities.Candle) {
subscriptions[subscription.id] = subscription.out
if lastCandle != nil {
s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out})
if rm := s.sendCandlesToSubscribers([]entities.Candle{*lastCandle}, map[string]chan entities.Candle{subscription.id: subscription.out}); len(rm) != 0 {
// if send returns a non-empty slice, the new subscription was removed from the map literal, and should also be removed from the main subscriptions map
delete(subscriptions, subscription.id)
}
}
}

Expand Down Expand Up @@ -215,15 +218,18 @@ func (s *CandleUpdates) getCandleUpdates(ctx context.Context, lastCandle *entiti
return updates, nil
}

func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) {
func (s *CandleUpdates) sendCandlesToSubscribers(candles []entities.Candle, subscriptions map[string]chan entities.Candle) []string {
rm := []string{}
for subscriptionID, outCh := range subscriptions {
for _, candle := range candles {
select {
case outCh <- candle:
default:
removeSubscription(subscriptions, subscriptionID)
rm = append(rm, subscriptionID)
break
}
}
}
return rm
}

0 comments on commit c1d3159

Please sign in to comment.