Skip to content

Commit

Permalink
Merge pull request #143 from kaleido-io/fix-142
Browse files Browse the repository at this point in the history
Add synchronization to broadcast when topic list changes
  • Loading branch information
nguyer authored Sep 2, 2021
2 parents 2e28ec4 + b59d554 commit cf9e5b0
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions internal/ws/wsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func (s *webSocketServer) Close() {

func (s *webSocketServer) getTopic(topic string) *webSocketTopic {
s.mux.Lock()
defer s.mux.Unlock()
t, exists := s.topics[topic]
if !exists {
t = &webSocketTopic{
Expand All @@ -135,6 +134,9 @@ func (s *webSocketServer) getTopic(topic string) *webSocketTopic {
}
s.topics[topic] = t
s.topicMap[topic] = make(map[string]*webSocketConnection)
}
s.mux.Unlock()
if !exists {
// Signal to the broadcaster that a new topic has been added
s.newTopic <- true
}
Expand Down Expand Up @@ -162,6 +164,9 @@ func (s *webSocketServer) SendReply(message interface{}) {
func (s *webSocketServer) processBroadcasts() {
var topics []string
buildCases := func() []reflect.SelectCase {
// only hold the lock while we're building the list of cases (not while doing the select)
s.mux.Lock()
defer s.mux.Unlock()
topics = make([]string, len(s.topics))
cases := make([]reflect.SelectCase, len(s.topics)+1)
i := 0
Expand All @@ -187,20 +192,35 @@ func (s *webSocketServer) processBroadcasts() {
} else {
// Message on one of the existing topics
// Gather all connections interested in this topic and send to them
s.mux.Lock()
topic := topics[chosen]
s.broadcastToConnections(s.topicMap[topic], value.Interface())
wsconns := getConnListFromMap(s.topicMap[topic])
s.mux.Unlock()
s.broadcastToConnections(wsconns, value.Interface())
}
}
}

// getConnListFromMap is a simple helper to snapshot a map into a list, which can be called with a short-lived lock
func getConnListFromMap(tm map[string]*webSocketConnection) []*webSocketConnection {
wsconns := make([]*webSocketConnection, 0, len(tm))
for _, c := range tm {
wsconns = append(wsconns, c)
}
return wsconns
}

func (s *webSocketServer) processReplies() {
for {
message := <-s.replyChannel
s.broadcastToConnections(s.replyMap, message)
s.mux.Lock()
wsconns := getConnListFromMap(s.replyMap)
s.mux.Unlock()
s.broadcastToConnections(wsconns, message)
}
}

func (s *webSocketServer) broadcastToConnections(connections map[string]*webSocketConnection, message interface{}) {
func (s *webSocketServer) broadcastToConnections(connections []*webSocketConnection, message interface{}) {
for _, c := range connections {
c.broadcast <- message
}
Expand Down

0 comments on commit cf9e5b0

Please sign in to comment.