Skip to content

Commit

Permalink
Merge pull request #37 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Added Broadcaster
  • Loading branch information
thrawn01 authored Nov 29, 2018
2 parents 85b3be5 + e21a90f commit 8dbb295
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,55 @@ Provides user agent parsing into Mailgun [ClientInfo](https://github.com/mailgun
```
clientInfo := useragent.Parse("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.70 Safari/537.17")
```

## Broadcaster
Allow the user to notify multiple goroutines of an event. This implementation guarantees every goroutine will wake
for every broadcast sent. In the event the goroutine falls behind and more broadcasts() are sent than the goroutine
has handled the broadcasts are buffered up to 10,000 broadcasts. Once the broadcast buffer limit is reached calls
to broadcast() will block until goroutines consuming the broadcasts can catch up.

```go
broadcaster := holster.NewBroadcaster()
done := make(chan struct{})
var mutex sync.Mutex
var chat []string

// Start some simple chat clients that are responsible for
// sending the contents of the []chat slice to their clients
for i := 0; i < 2; i++ {
go func(idx int) {
var clientIndex int
for {
mutex.Lock()
if clientIndex != len(chat) {
// Pretend we are sending a message to our client via a socket
fmt.Printf("Client [%d] Chat: %s\n", idx, chat[clientIndex])
clientIndex++
mutex.Unlock()
continue
}
mutex.Unlock()

// Wait for more chats to be added to chat[]
select {
case <-broadcaster.WaitChan(string(idx)):
case <-done:
return
}
}
}(i)
}

// Add some chat lines to the []chat slice
for i := 0; i < 5; i++ {
mutex.Lock()
chat = append(chat, fmt.Sprintf("Message '%d'", i))
mutex.Unlock()

// Notify any clients there are new chats to read
broadcaster.Broadcast()
}

// Tell the clients to quit
close(done)
```
73 changes: 73 additions & 0 deletions broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package holster

import "sync"

type Broadcaster interface {
WaitChan(string) chan struct{}
Wait(string)
Broadcast()
Done()
}

// Broadcasts to goroutines a new event has occurred and any waiting go routines should
// stop waiting and do work. The current implementation is limited to 10,0000 unconsumed
// broadcasts. If the user broadcasts more events than can be consumed calls to broadcast()
// will eventually block until the goroutines can catch up. This ensures goroutines will
// receive at least one event per broadcast() call.
type broadcast struct {
clients map[string]chan struct{}
done chan struct{}
mutex sync.Mutex
}

func NewBroadcaster() Broadcaster {
return &broadcast{
clients: make(map[string]chan struct{}),
done: make(chan struct{}),
}
}

// Notify all Waiting goroutines
func (b *broadcast) Broadcast() {
b.mutex.Lock()
for _, channel := range b.clients {
channel <- struct{}{}
}
b.mutex.Unlock()
}

// Cancels any Wait() calls that are currently blocked
func (b *broadcast) Done() {
close(b.done)
}

// Blocks until a broadcast is received
func (b *broadcast) Wait(name string) {
b.mutex.Lock()
channel, ok := b.clients[name]
if !ok {
b.clients[name] = make(chan struct{}, 10000)
channel = b.clients[name]
}
b.mutex.Unlock()

// Wait for a new event or done is closed
select {
case <-channel:
return
case <-b.done:
return
}
}

// Returns a channel the caller can use to wait for a broadcast
func (b *broadcast) WaitChan(name string) chan struct{} {
b.mutex.Lock()
channel, ok := b.clients[name]
if !ok {
b.clients[name] = make(chan struct{}, 10000)
channel = b.clients[name]
}
b.mutex.Unlock()
return channel
}
79 changes: 79 additions & 0 deletions broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package holster_test

import (
"fmt"
"sync"
"testing"

"github.com/mailgun/holster"
)

func TestBroadcast(t *testing.T) {
broadcaster := holster.NewBroadcaster()
ready := make(chan struct{}, 2)
done := make(chan struct{})
socket := make(chan string, 11)
var mutex sync.Mutex
var chat []string

// Start some simple chat clients that are responsible for
// sending the contents of the []chat slice to their clients
for i := 0; i < 2; i++ {
go func(idx int) {
var clientIndex int
var once sync.Once
for {
mutex.Lock()
if clientIndex != len(chat) {
// Pretend we are sending a message to our client via a socket
socket <- fmt.Sprintf("Client [%d] Chat: %s\n", idx, chat[clientIndex])
clientIndex++
mutex.Unlock()
continue
}
mutex.Unlock()

// Indicate the client is up and ready to receive broadcasts
once.Do(func() {
ready <- struct{}{}
})

// Wait for more chats to be added to chat[]
select {
case <-broadcaster.WaitChan(string(idx)):
case <-done:
return
}
}
}(i)
}

// Wait for the clients to be ready
<-ready
<-ready

// Add some chat lines to the []chat slice
for i := 0; i < 5; i++ {
mutex.Lock()
chat = append(chat, fmt.Sprintf("Message '%d'", i))
mutex.Unlock()

// Notify any clients there are new chats to read
broadcaster.Broadcast()
}

var count int
for msg := range socket {
fmt.Printf(msg)
count++
if count == 10 {
break
}
}

if count != 10 {
t.Errorf("count != 10")
}
// Tell the clients to quit
close(done)
}

0 comments on commit 8dbb295

Please sign in to comment.