Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking Change - All functions that took a Peer now take a *Session #108

Open
wants to merge 26 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
239195f
All functions that took a Peer now take a *Session
beatgammit Dec 2, 2015
4621c93
Fix websocket send concurrency issue
beatgammit Jan 12, 2016
6959270
Added mutex to invocation map
beatgammit Jan 13, 2016
1f80efd
Try increasing buffers
beatgammit Jan 13, 2016
7731912
Add error URI to errors in RPC commands
beatgammit Jan 21, 2016
dd3c64a
Fixed error reporting on RPC calls
Feb 5, 2016
73f37c5
Merge branch 'v2' of github.com:jcelliott/turnpike into v2
beatgammit Mar 31, 2016
96f011e
fixed concurrent writes to map in realm
gngeorgiev Mar 25, 2016
742b9c3
added exclude_me support
gngeorgiev Mar 25, 2016
bf55c97
Fixed data races
beatgammit Mar 31, 2016
7e3d305
Fix problems using the wrong types of ids in pub/sub
beatgammit Aug 1, 2016
405ff69
Change all imports to my repo
beatgammit Aug 4, 2016
c88b89f
Changed most references to old repo
beatgammit Aug 4, 2016
ae44bc4
Fix compile errors in examples
beatgammit Aug 4, 2016
4b399d4
Added locks to Client
beatgammit Aug 4, 2016
c1cf8e0
Added more locks for unlikely, but still possible data races
beatgammit Aug 4, 2016
71f77d3
If the websocket is closed, don't close it again
beatgammit Aug 19, 2016
afcecc3
Close session in realm
beatgammit Aug 22, 2016
00ecc28
Send messages in a goroutine
beatgammit Aug 25, 2016
4a8e170
Added sendMsgs channel to send websocket messages without blocking.
acls Aug 25, 2016
90945ea
remove session subscriptions when session ends
acls Aug 25, 2016
20397bd
Clean up client listeners (#2)
beatgammit Oct 31, 2016
4a4b738
Fix data race panic
beatgammit Oct 31, 2016
a8e7cbc
Recover from send on closed channel
beatgammit Jan 16, 2017
aaa21ce
Increase receive timeout to 30 seconds
beatgammit Jun 8, 2017
573f579
Manually close connection on unexpected errors
acls Sep 6, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Turnpike [![Build Status](https://drone.io/github.com/jcelliott/turnpike/status.png)](https://drone.io/github.com/jcelliott/turnpike/latest) [![Coverage Status](https://coveralls.io/repos/jcelliott/turnpike/badge.svg?branch=v2)](https://coveralls.io/r/jcelliott/turnpike?branch=v2) [![GoDoc](https://godoc.org/gopkg.in/jcelliott/turnpike?status.svg)](http://godoc.org/gopkg.in/jcelliott/turnpike.v2)
Turnpike [![Build Status](https://drone.io/github.com/jcelliott/turnpike/status.png)](https://drone.io/github.com/jcelliott/turnpike/latest) [![Coverage Status](https://coveralls.io/repos/jcelliott/turnpike/badge.svg?branch=v2)](https://coveralls.io/r/jcelliott/turnpike?branch=v2) [![GoDoc](https://godoc.org/gopkg.in/beatgammit/turnpike?status.svg)](http://godoc.org/gopkg.in/beatgammit/turnpike.v2)
===

Go implementation of [WAMP](http://wamp.ws/) - The Web Application Messaging Protocol
Expand All @@ -18,25 +18,23 @@ basic stand-alone router. The router library can be used to embed a WAMP router
in another application, or to build a custom router implementation. The client
library can be used to communicate with any WAMP router.

This version of Turnpike supports WAMP v2. For WAMP v1 support see the [v1 branch](https://github.com/jcelliott/turnpike/tree/v1).

Status
---

Turnpike v2 is still under development, but is getting close to a stable
release. If you have any feedback or suggestions, please
[open an issue](https://github.com/jcelliott/turnpike/issues/new).
[open an issue](https://github.com/beatgammit/turnpike/issues/new).

Installation
---

Library:

go get -u gopkg.in/jcelliott/turnpike.v2
go get -u gopkg.in/beatgammit/turnpike.v2

Stand-alone router:

go get -u gopkg.in/jcelliott/turnpike.v2/turnpike
go get -u gopkg.in/beatgammit/turnpike.v2/turnpike

Client library usage
---
Expand All @@ -56,7 +54,7 @@ import (
"log"
"net/http"

"gopkg.in/jcelliott/turnpike.v2"
"gopkg.in/beatgammit/turnpike.v2"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package turnpike
// Authorize takes the session and the message (request), and returns true if
// the request is authorized, otherwise false.
type Authorizer interface {
Authorize(session Session, msg Message) (bool, error)
Authorize(session *Session, msg Message) (bool, error)
}

// DefaultAuthorizer always returns authorized.
Expand All @@ -18,6 +18,6 @@ func NewDefaultAuthorizer() Authorizer {
return &defaultAuthorizer{}
}

func (da *defaultAuthorizer) Authorize(session Session, msg Message) (bool, error) {
func (da *defaultAuthorizer) Authorize(session *Session, msg Message) (bool, error) {
return true, nil
}
119 changes: 98 additions & 21 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package turnpike

import (
"sync"
)

// Broker is the interface implemented by an object that handles routing EVENTS
// from Publishers to Subscribers.
type Broker interface {
// Publishes a message to all Subscribers.
Publish(Sender, *Publish)
Publish(*Session, *Publish)
// Subscribes to messages on a URI.
Subscribe(Sender, *Subscribe)
Subscribe(*Session, *Subscribe)
// Unsubscribes from messages on a URI.
Unsubscribe(Sender, *Unsubscribe)
Unsubscribe(*Session, *Unsubscribe)
// Remove all of session's subscriptions.
RemoveSession(*Session)
}

// A super simple broker that matches URIs to Subscribers.
type defaultBroker struct {
routes map[URI]map[ID]Sender
subscriptions map[ID]URI
subscribers map[*Session][]ID

lastRequestId ID

sync.RWMutex
}

// NewDefaultBroker initializes and returns a simple broker that matches URIs to
Expand All @@ -23,73 +34,139 @@ func NewDefaultBroker() Broker {
return &defaultBroker{
routes: make(map[URI]map[ID]Sender),
subscriptions: make(map[ID]URI),
subscribers: make(map[*Session][]ID),
}
}

func (br *defaultBroker) nextRequestId() ID {
br.lastRequestId++
if br.lastRequestId > MAX_REQUEST_ID {
br.lastRequestId = 1
}

return br.lastRequestId
}

// Publish sends a message to all subscribed clients except for the sender.
//
// If msg.Options["acknowledge"] == true, the publisher receives a Published event
// after the message has been sent to all subscribers.
func (br *defaultBroker) Publish(pub Sender, msg *Publish) {
func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
br.RLock()
defer br.RUnlock()

pub := sess.Peer
pubID := NewID()
evtTemplate := Event{
Publication: pubID,
Arguments: msg.Arguments,
ArgumentsKw: msg.ArgumentsKw,
Details: make(map[string]interface{}),
}

excludePublisher := true
if exclude, ok := msg.Options["exclude_me"].(bool); ok {
excludePublisher = exclude
}

for id, sub := range br.routes[msg.Topic] {
// shallow-copy the template
event := evtTemplate
event.Subscription = id
// don't send event to publisher
if sub != pub {
sub.Send(&event)
if sub != pub || !excludePublisher {
go sub.Send(&event)
}
}

// only send published message if acknowledge is present and set to true
if doPub, _ := msg.Options["acknowledge"].(bool); doPub {
pub.Send(&Published{Request: msg.Request, Publication: pubID})
go pub.Send(&Published{Request: msg.Request, Publication: pubID})
}
}

// Subscribe subscribes the client to the given topic.
func (br *defaultBroker) Subscribe(sub Sender, msg *Subscribe) {
func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
br.Lock()
defer br.Unlock()

if _, ok := br.routes[msg.Topic]; !ok {
br.routes[msg.Topic] = make(map[ID]Sender)
}
id := NewID()
br.routes[msg.Topic][id] = sub

id := br.nextRequestId()
br.routes[msg.Topic][id] = sess.Peer
br.subscriptions[id] = msg.Topic

sub.Send(&Subscribed{Request: msg.Request, Subscription: id})
// subscribers
ids, ok := br.subscribers[sess]
if !ok {
ids = []ID{}
}
ids = append(ids, id)
br.subscribers[sess] = ids

go sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id})
}

func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) {
topic, ok := br.subscriptions[msg.Subscription]
if !ok {
func (br *defaultBroker) RemoveSession(sess *Session) {
log.Printf("broker remove peer %p", sess)
br.Lock()
defer br.Unlock()

for _, id := range br.subscribers[sess] {
br.unsubscribe(sess, id)
}
}

func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) {
br.Lock()
defer br.Unlock()

if !br.unsubscribe(sess, msg.Subscription) {
err := &Error{
Type: msg.MessageType(),
Request: msg.Request,
Error: ErrNoSuchSubscription,
}
sub.Send(err)
go sess.Peer.Send(err)
log.Printf("Error unsubscribing: no such subscription %v", msg.Subscription)
return
}
delete(br.subscriptions, msg.Subscription)

go sess.Peer.Send(&Unsubscribed{Request: msg.Request})
}

func (br *defaultBroker) unsubscribe(sess *Session, id ID) bool {
log.Printf("broker unsubscribing: %p, %d", &sess, id)
topic, ok := br.subscriptions[id]
if !ok {
return false
}
delete(br.subscriptions, id)

if r, ok := br.routes[topic]; !ok {
log.Printf("Error unsubscribing: unable to find routes for %s topic", topic)
} else if _, ok := r[msg.Subscription]; !ok {
log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, msg.Subscription)
} else if _, ok := r[id]; !ok {
log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, id)
} else {
delete(r, msg.Subscription)
delete(r, id)
if len(r) == 0 {
delete(br.routes, topic)
}
}
sub.Send(&Unsubscribed{Request: msg.Request})

// subscribers
ids := br.subscribers[sess][:0]
for _, id := range br.subscribers[sess] {
if id != id {
ids = append(ids, id)
}
}
if len(ids) == 0 {
delete(br.subscribers, sess)
} else {
br.subscribers[sess] = ids
}

return true
}
74 changes: 66 additions & 8 deletions broker_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
package turnpike

import (
"sync"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

type TestSender struct {
type TestPeer struct {
received Message
sent Message

sync.RWMutex
}

func (s *TestPeer) Send(msg Message) error {
s.Lock()
defer s.Unlock()

s.received = msg
return nil
}

func (s *TestSender) Send(msg Message) error { s.received = msg; return nil }
func (s *TestPeer) getReceived() Message {
s.RLock()
defer s.RUnlock()

return s.received
}

// TODO: implement me
func (s *TestPeer) Receive() <-chan Message { return nil }
func (s *TestPeer) Close() error { return nil }

func TestSubscribe(t *testing.T) {
Convey("Subscribing to a topic", t, func() {
broker := NewDefaultBroker().(*defaultBroker)
subscriber := &TestSender{}
subscriber := &TestPeer{}
sess := &Session{Peer: subscriber}
testTopic := URI("turnpike.test.topic")
msg := &Subscribe{Request: 123, Topic: testTopic}
broker.Subscribe(subscriber, msg)
broker.Subscribe(sess, msg)

Convey("The subscriber should have received a SUBSCRIBED message", func() {
sub := subscriber.received.(*Subscribed).Subscription
Expand All @@ -31,22 +53,58 @@ func TestSubscribe(t *testing.T) {
So(ok, ShouldBeTrue)
So(topic, ShouldEqual, testTopic)
})
})
}

func TestBrokerNextRequestId(t *testing.T) {
Convey("nextRequestId called multiple times", t, func() {
broker := NewDefaultBroker().(*defaultBroker)
id1 := broker.nextRequestId()
id2 := broker.nextRequestId()

// TODO: multiple subscribe requests?
So(id1, ShouldNotEqual, id2)
})

Convey("nextRequestId should roll over", t, func() {
broker := NewDefaultBroker().(*defaultBroker)
broker.lastRequestId = MAX_REQUEST_ID
id := broker.nextRequestId()

So(id, ShouldEqual, 1)
})
}

func TestMultipleSubscribe(t *testing.T) {
const SUBSCRIBERS = 2
const TOPIC = URI("turnpike.test.topic")

Convey("Multiple subscribers to a topic", t, func() {
broker := NewDefaultBroker().(*defaultBroker)
for i := 0; i < SUBSCRIBERS; i++ {
subscriber := &TestPeer{}
sess := &Session{Peer: subscriber, Id: NewID()}
msg := &Subscribe{Request: sess.NextRequestId(), Topic: TOPIC}
broker.Subscribe(sess, msg)
}

Convey("There should be a map entry for each subscriber", func() {
So(len(broker.routes[TOPIC]), ShouldEqual, SUBSCRIBERS)
})
})
}

func TestUnsubscribe(t *testing.T) {
broker := NewDefaultBroker().(*defaultBroker)
subscriber := &TestSender{}
subscriber := &TestPeer{}
testTopic := URI("turnpike.test.topic")
msg := &Subscribe{Request: 123, Topic: testTopic}
broker.Subscribe(subscriber, msg)
sess := &Session{Peer: subscriber}
broker.Subscribe(sess, msg)
sub := subscriber.received.(*Subscribed).Subscription

Convey("Unsubscribing from a topic", t, func() {
msg := &Unsubscribe{Request: 124, Subscription: sub}
broker.Unsubscribe(subscriber, msg)
broker.Unsubscribe(sess, msg)

Convey("The peer should have received an UNSUBSCRIBED message", func() {
unsub := subscriber.received.(*Unsubscribed).Request
Expand Down
Loading