From 239195fd160b7d61f688bb77590fc33b7e5a112a Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Wed, 2 Dec 2015 15:29:03 -0700
Subject: [PATCH 01/25] All functions that took a Peer now take a *Session

- allows us to generate session-specific ids
---
 authorizer.go   |   4 +-
 broker.go       |  25 +++---
 broker_test.go  |  24 ++++--
 dealer.go       | 225 ++++++++++++++++++++++--------------------------
 dealer_test.go  |  57 ++++++------
 interceptor.go  |   4 +-
 realm.go        |  26 +++---
 router.go       |   2 +-
 session.go      |  16 +++-
 session_test.go |  21 +++++
 10 files changed, 215 insertions(+), 189 deletions(-)
 create mode 100644 session_test.go

diff --git a/authorizer.go b/authorizer.go
index 8a3f39e..a814498 100644
--- a/authorizer.go
+++ b/authorizer.go
@@ -6,7 +6,7 @@ package turnpike
 // Authorize takes the session and the message (request), and returns true if
 // the request is authorized, otherwise false.
 type Authorizer interface {
-	Authorize(session Session, msg Message) (bool, error)
+	Authorize(session *Session, msg Message) (bool, error)
 }
 
 // DefaultAuthorizer always returns authorized.
@@ -18,6 +18,6 @@ func NewDefaultAuthorizer() Authorizer {
 	return &defaultAuthorizer{}
 }
 
-func (da *defaultAuthorizer) Authorize(session Session, msg Message) (bool, error) {
+func (da *defaultAuthorizer) Authorize(session *Session, msg Message) (bool, error) {
 	return true, nil
 }
diff --git a/broker.go b/broker.go
index b7b7596..3c864d3 100644
--- a/broker.go
+++ b/broker.go
@@ -4,11 +4,11 @@ package turnpike
 // from Publishers to Subscribers.
 type Broker interface {
 	// Publishes a message to all Subscribers.
-	Publish(Sender, *Publish)
+	Publish(*Session, *Publish)
 	// Subscribes to messages on a URI.
-	Subscribe(Sender, *Subscribe)
+	Subscribe(*Session, *Subscribe)
 	// Unsubscribes from messages on a URI.
-	Unsubscribe(Sender, *Unsubscribe)
+	Unsubscribe(*Session, *Unsubscribe)
 }
 
 // A super simple broker that matches URIs to Subscribers.
@@ -30,8 +30,9 @@ func NewDefaultBroker() Broker {
 //
 // If msg.Options["acknowledge"] == true, the publisher receives a Published event
 // after the message has been sent to all subscribers.
-func (br *defaultBroker) Publish(pub Sender, msg *Publish) {
-	pubID := NewID()
+func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
+	pub := sess.Peer
+	pubID := sess.NextRequestId()
 	evtTemplate := Event{
 		Publication: pubID,
 		Arguments:   msg.Arguments,
@@ -55,19 +56,19 @@ func (br *defaultBroker) Publish(pub Sender, msg *Publish) {
 }
 
 // Subscribe subscribes the client to the given topic.
-func (br *defaultBroker) Subscribe(sub Sender, msg *Subscribe) {
+func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
 	if _, ok := br.routes[msg.Topic]; !ok {
 		br.routes[msg.Topic] = make(map[ID]Sender)
 	}
-	id := NewID()
-	br.routes[msg.Topic][id] = sub
+	id := sess.NextRequestId()
+	br.routes[msg.Topic][id] = sess.Peer
 
 	br.subscriptions[id] = msg.Topic
 
-	sub.Send(&Subscribed{Request: msg.Request, Subscription: id})
+	sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id})
 }
 
-func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) {
+func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) {
 	topic, ok := br.subscriptions[msg.Subscription]
 	if !ok {
 		err := &Error{
@@ -75,7 +76,7 @@ func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) {
 			Request: msg.Request,
 			Error:   ErrNoSuchSubscription,
 		}
-		sub.Send(err)
+		sess.Peer.Send(err)
 		log.Printf("Error unsubscribing: no such subscription %v", msg.Subscription)
 		return
 	}
@@ -91,5 +92,5 @@ func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) {
 			delete(br.routes, topic)
 		}
 	}
-	sub.Send(&Unsubscribed{Request: msg.Request})
+	sess.Peer.Send(&Unsubscribed{Request: msg.Request})
 }
diff --git a/broker_test.go b/broker_test.go
index 5c8480c..d53807f 100644
--- a/broker_test.go
+++ b/broker_test.go
@@ -6,19 +6,28 @@ import (
 	. "github.com/smartystreets/goconvey/convey"
 )
 
-type TestSender struct {
+type TestPeer struct {
 	received Message
+	sent     Message
 }
 
-func (s *TestSender) Send(msg Message) error { s.received = msg; return nil }
+func (s *TestPeer) Send(msg Message) error {
+	s.received = msg
+	return nil
+}
+
+// TODO: implement me
+func (s *TestPeer) Receive() <-chan Message { return nil }
+func (s *TestPeer) Close() error            { return nil }
 
 func TestSubscribe(t *testing.T) {
 	Convey("Subscribing to a topic", t, func() {
 		broker := NewDefaultBroker().(*defaultBroker)
-		subscriber := &TestSender{}
+		subscriber := &TestPeer{}
+		sess := &Session{Peer: subscriber}
 		testTopic := URI("turnpike.test.topic")
 		msg := &Subscribe{Request: 123, Topic: testTopic}
-		broker.Subscribe(subscriber, msg)
+		broker.Subscribe(sess, msg)
 
 		Convey("The subscriber should have received a SUBSCRIBED message", func() {
 			sub := subscriber.received.(*Subscribed).Subscription
@@ -38,15 +47,16 @@ func TestSubscribe(t *testing.T) {
 
 func TestUnsubscribe(t *testing.T) {
 	broker := NewDefaultBroker().(*defaultBroker)
-	subscriber := &TestSender{}
+	subscriber := &TestPeer{}
 	testTopic := URI("turnpike.test.topic")
 	msg := &Subscribe{Request: 123, Topic: testTopic}
-	broker.Subscribe(subscriber, msg)
+	sess := &Session{Peer: subscriber}
+	broker.Subscribe(sess, msg)
 	sub := subscriber.received.(*Subscribed).Subscription
 
 	Convey("Unsubscribing from a topic", t, func() {
 		msg := &Unsubscribe{Request: 124, Subscription: sub}
-		broker.Unsubscribe(subscriber, msg)
+		broker.Unsubscribe(sess, msg)
 
 		Convey("The peer should have received an UNSUBSCRIBED message", func() {
 			unsub := subscriber.received.(*Unsubscribed).Request
diff --git a/dealer.go b/dealer.go
index e9fcd1e..d60c391 100644
--- a/dealer.go
+++ b/dealer.go
@@ -3,54 +3,56 @@ package turnpike
 // A Dealer routes and manages RPC calls to callees.
 type Dealer interface {
 	// Register a procedure on an endpoint
-	Register(Sender, *Register)
+	Register(*Session, *Register)
 	// Unregister a procedure on an endpoint
-	Unregister(Sender, *Unregister)
+	Unregister(*Session, *Unregister)
 	// Call a procedure on an endpoint
-	Call(Sender, *Call)
+	Call(*Session, *Call)
 	// Return the result of a procedure call
-	Yield(Sender, *Yield)
+	Yield(*Session, *Yield)
 	// Handle an ERROR message from an invocation
-	Error(Sender, *Error)
+	Error(*Session, *Error)
 	// Remove a callee's registrations
-	RemovePeer(Sender)
+	RemoveSession(*Session)
 }
 
 type remoteProcedure struct {
-	Endpoint  Sender
-	Procedure URI
+	Endpoint     *Session
+	Procedure    URI
+	Registration ID
+}
+
+type rpcRequest struct {
+	caller    *Session
+	requestId ID
 }
 
 type defaultDealer struct {
 	// map registration IDs to procedures
-	procedures map[ID]remoteProcedure
+	procedures map[URI]remoteProcedure
 	// map procedure URIs to registration IDs
 	// TODO: this will eventually need to be `map[URI][]ID` to support
 	// multiple callees for the same procedure
-	registrations map[URI]ID
-	// keep track of call IDs so we can send the response to the caller
-	calls map[ID]Sender
+	registrations map[ID]URI
+
 	// link the invocation ID to the call ID
-	invocations map[ID]ID
-	// keep track of callee's registrations
-	callees map[Sender]map[ID]bool
+	invocations map[*Session]map[ID]rpcRequest
+	// callees map[*Session]map[ID]bool
 }
 
 // NewDefaultDealer returns the default turnpike dealer implementation
 func NewDefaultDealer() Dealer {
 	return &defaultDealer{
-		procedures:    make(map[ID]remoteProcedure),
-		registrations: make(map[URI]ID),
-		calls:         make(map[ID]Sender),
-		invocations:   make(map[ID]ID),
-		callees:       make(map[Sender]map[ID]bool),
+		procedures:    make(map[URI]remoteProcedure),
+		registrations: make(map[ID]URI),
+		invocations:   make(map[*Session]map[ID]rpcRequest),
 	}
 }
 
-func (d *defaultDealer) Register(callee Sender, msg *Register) {
-	if id, ok := d.registrations[msg.Procedure]; ok {
+func (d *defaultDealer) Register(sess *Session, msg *Register) {
+	if id, ok := d.procedures[msg.Procedure]; ok {
 		log.Println("error: procedure already exists:", msg.Procedure, id)
-		callee.Send(&Error{
+		sess.Peer.Send(&Error{
 			Type:    msg.MessageType(),
 			Request: msg.Request,
 			Details: make(map[string]interface{}),
@@ -58,145 +60,120 @@ func (d *defaultDealer) Register(callee Sender, msg *Register) {
 		})
 		return
 	}
-	reg := NewID()
-	d.procedures[reg] = remoteProcedure{callee, msg.Procedure}
-	d.registrations[msg.Procedure] = reg
-	d.addCalleeRegistration(callee, reg)
-	log.Printf("registered procedure %v [%v]", reg, msg.Procedure)
-	callee.Send(&Registered{
+
+	registrationId := NewID()
+	d.procedures[msg.Procedure] = remoteProcedure{sess, msg.Procedure, registrationId}
+	d.registrations[registrationId] = msg.Procedure
+
+	// d.addCalleeRegistration(sess, reg)
+	log.Printf("registered procedure %v [%v]", registrationId, msg.Procedure)
+	sess.Peer.Send(&Registered{
 		Request:      msg.Request,
-		Registration: reg,
+		Registration: registrationId,
 	})
 }
 
-func (d *defaultDealer) Unregister(callee Sender, msg *Unregister) {
-	if procedure, ok := d.procedures[msg.Registration]; !ok {
+func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) {
+	if procedure, ok := d.registrations[msg.Registration]; !ok {
 		// the registration doesn't exist
 		log.Println("error: no such registration:", msg.Registration)
-		callee.Send(&Error{
+		sess.Peer.Send(&Error{
 			Type:    msg.MessageType(),
 			Request: msg.Request,
 			Details: make(map[string]interface{}),
 			Error:   ErrNoSuchRegistration,
 		})
 	} else {
-		delete(d.registrations, procedure.Procedure)
-		delete(d.procedures, msg.Registration)
-		d.removeCalleeRegistration(callee, msg.Registration)
-		log.Printf("unregistered procedure %v [%v]", procedure.Procedure, msg.Registration)
-		callee.Send(&Unregistered{
+		delete(d.registrations, msg.Registration)
+		delete(d.procedures, procedure)
+		// d.removeCalleeRegistration(sess, msg.Registration)
+		log.Printf("unregistered procedure %v [%v]", procedure, msg.Registration)
+		sess.Peer.Send(&Unregistered{
 			Request: msg.Request,
 		})
 	}
 }
 
-func (d *defaultDealer) Call(caller Sender, msg *Call) {
-	if reg, ok := d.registrations[msg.Procedure]; !ok {
-		caller.Send(&Error{
+func (d *defaultDealer) Call(sess *Session, msg *Call) {
+	if rproc, ok := d.procedures[msg.Procedure]; !ok {
+		sess.Peer.Send(&Error{
 			Type:    msg.MessageType(),
 			Request: msg.Request,
 			Details: make(map[string]interface{}),
 			Error:   ErrNoSuchProcedure,
 		})
 	} else {
-		if rproc, ok := d.procedures[reg]; !ok {
-			// found a registration id, but doesn't match any remote procedure
-			caller.Send(&Error{
-				Type:    msg.MessageType(),
-				Request: msg.Request,
-				Details: make(map[string]interface{}),
-				// TODO: what should this error be?
-				Error: URI("wamp.error.internal_error"),
-			})
-		} else {
-			// everything checks out, make the invocation request
-			// TODO: make the Request ID specific to the caller
-			d.calls[msg.Request] = caller
-			invocationID := NewID()
-			d.invocations[invocationID] = msg.Request
-			rproc.Endpoint.Send(&Invocation{
-				Request:      invocationID,
-				Registration: reg,
-				Details:      map[string]interface{}{},
-				Arguments:    msg.Arguments,
-				ArgumentsKw:  msg.ArgumentsKw,
-			})
-			log.Printf("dispatched CALL %v [%v] to callee as INVOCATION %v",
-				msg.Request, msg.Procedure, invocationID,
-			)
+		// everything checks out, make the invocation request
+		// TODO: make the Request ID specific to the caller
+		// d.calls[msg.Request] = sess
+		invocationID := rproc.Endpoint.NextRequestId()
+		if d.invocations[rproc.Endpoint] == nil {
+			d.invocations[rproc.Endpoint] = make(map[ID]rpcRequest)
 		}
+		d.invocations[rproc.Endpoint][invocationID] = rpcRequest{sess, msg.Request}
+		rproc.Endpoint.Send(&Invocation{
+			Request:      invocationID,
+			Registration: rproc.Registration,
+			Details:      map[string]interface{}{},
+			Arguments:    msg.Arguments,
+			ArgumentsKw:  msg.ArgumentsKw,
+		})
+		log.Printf("dispatched CALL %v [%v] to callee as INVOCATION %v",
+			msg.Request, msg.Procedure, invocationID,
+		)
 	}
 }
 
-func (d *defaultDealer) Yield(callee Sender, msg *Yield) {
-	if callID, ok := d.invocations[msg.Request]; !ok {
+func (d *defaultDealer) Yield(sess *Session, msg *Yield) {
+	if d.invocations[sess] == nil {
+		log.Println("received YIELD message from unknown session:", sess.Id)
+		return
+	}
+	if call, ok := d.invocations[sess][msg.Request]; !ok {
 		// WAMP spec doesn't allow sending an error in response to a YIELD message
 		log.Println("received YIELD message with invalid invocation request ID:", msg.Request)
 	} else {
-		delete(d.invocations, msg.Request)
-		if caller, ok := d.calls[callID]; !ok {
-			// found the invocation id, but doesn't match any call id
-			// WAMP spec doesn't allow sending an error in response to a YIELD message
-			log.Printf("received YIELD message, but unable to match it (%v) to a CALL ID", msg.Request)
-		} else {
-			delete(d.calls, callID)
-			// return the result to the caller
-			caller.Send(&Result{
-				Request:     callID,
-				Details:     map[string]interface{}{},
-				Arguments:   msg.Arguments,
-				ArgumentsKw: msg.ArgumentsKw,
-			})
-			log.Printf("returned YIELD %v to caller as RESULT %v", msg.Request, callID)
-		}
+		// TODO: delete old keys; could do here, but should probably lock the map
+		delete(d.invocations[sess], msg.Request)
+		// return the result to the caller
+		call.caller.Send(&Result{
+			Request:     call.requestId,
+			Details:     map[string]interface{}{},
+			Arguments:   msg.Arguments,
+			ArgumentsKw: msg.ArgumentsKw,
+		})
+		log.Printf("returned YIELD %v to caller as RESULT %v", msg.Request, call.requestId)
 	}
 }
 
-func (d *defaultDealer) Error(peer Sender, msg *Error) {
-	if callID, ok := d.invocations[msg.Request]; !ok {
+func (d *defaultDealer) Error(sess *Session, msg *Error) {
+	if d.invocations[sess] == nil {
+		log.Println("received YIELD message from unknown session:", sess.Id)
+		return
+	}
+	if call, ok := d.invocations[sess][msg.Request]; !ok {
 		log.Println("received ERROR (INVOCATION) message with invalid invocation request ID:", msg.Request)
 	} else {
-		delete(d.invocations, msg.Request)
-		if caller, ok := d.calls[callID]; !ok {
-			log.Printf("received ERROR (INVOCATION) message, but unable to match it (%v) to a CALL ID", msg.Request)
-		} else {
-			delete(d.calls, callID)
-			// return an error to the caller
-			caller.Send(&Error{
-				Type:        CALL,
-				Request:     callID,
-				Details:     make(map[string]interface{}),
-				Arguments:   msg.Arguments,
-				ArgumentsKw: msg.ArgumentsKw,
-			})
-			log.Printf("returned ERROR %v to caller as ERROR %v", msg.Request, callID)
-		}
+		delete(d.invocations[sess], msg.Request)
+		// return an error to the caller
+		call.caller.Peer.Send(&Error{
+			Type:        CALL,
+			Request:     call.requestId,
+			Details:     make(map[string]interface{}),
+			Arguments:   msg.Arguments,
+			ArgumentsKw: msg.ArgumentsKw,
+		})
+		log.Printf("returned ERROR %v to caller as ERROR %v", msg.Request, call.requestId)
 	}
 }
 
-func (d *defaultDealer) RemovePeer(callee Sender) {
-	for reg := range d.callees[callee] {
-		if procedure, ok := d.procedures[reg]; ok {
-			delete(d.registrations, procedure.Procedure)
-			delete(d.procedures, reg)
+func (d *defaultDealer) RemoveSession(sess *Session) {
+	// TODO: this is low hanging fruit for optimization
+	// TODO: potential data race here, should lock the map
+	for _, rproc := range d.procedures {
+		if rproc.Endpoint == sess {
+			delete(d.registrations, rproc.Registration)
+			delete(d.procedures, rproc.Procedure)
 		}
-		d.removeCalleeRegistration(callee, reg)
-	}
-}
-
-func (d *defaultDealer) addCalleeRegistration(callee Sender, reg ID) {
-	if _, ok := d.callees[callee]; !ok {
-		d.callees[callee] = make(map[ID]bool)
-	}
-	d.callees[callee][reg] = true
-}
-
-func (d *defaultDealer) removeCalleeRegistration(callee Sender, reg ID) {
-	if _, ok := d.callees[callee]; !ok {
-		return
-	}
-	delete(d.callees[callee], reg)
-	if len(d.callees[callee]) == 0 {
-		delete(d.callees, callee)
 	}
 }
diff --git a/dealer_test.go b/dealer_test.go
index e5ccbad..b862459 100644
--- a/dealer_test.go
+++ b/dealer_test.go
@@ -9,10 +9,11 @@ import (
 func TestRegister(t *testing.T) {
 	Convey("Registering a procedure", t, func() {
 		dealer := NewDefaultDealer().(*defaultDealer)
-		callee := &TestSender{}
+		callee := &TestPeer{}
 		testProcedure := URI("turnpike.test.endpoint")
 		msg := &Register{Request: 123, Procedure: testProcedure}
-		dealer.Register(callee, msg)
+		sess := &Session{Peer: callee}
+		dealer.Register(sess, msg)
 
 		Convey("The callee should have received a REGISTERED message", func() {
 			reg := callee.received.(*Registered).Registration
@@ -21,17 +22,17 @@ func TestRegister(t *testing.T) {
 
 		Convey("The dealer should have the endpoint registered", func() {
 			reg := callee.received.(*Registered).Registration
-			reg2, ok := dealer.registrations[testProcedure]
+			proc, ok := dealer.procedures[testProcedure]
 			So(ok, ShouldBeTrue)
-			So(reg, ShouldEqual, reg2)
-			proc, ok := dealer.procedures[reg]
+			So(reg, ShouldEqual, proc.Registration)
+			procedure, ok := dealer.registrations[reg]
 			So(ok, ShouldBeTrue)
-			So(proc.Procedure, ShouldEqual, testProcedure)
+			So(procedure, ShouldEqual, testProcedure)
 		})
 
 		Convey("The same procedure cannot be registered more than once", func() {
 			msg := &Register{Request: 321, Procedure: testProcedure}
-			dealer.Register(callee, msg)
+			dealer.Register(sess, msg)
 			err := callee.received.(*Error)
 			So(err.Error, ShouldEqual, ErrProcedureAlreadyExists)
 			So(err.Details, ShouldNotBeNil)
@@ -41,15 +42,16 @@ func TestRegister(t *testing.T) {
 
 func TestUnregister(t *testing.T) {
 	dealer := NewDefaultDealer().(*defaultDealer)
-	callee := &TestSender{}
+	callee := &TestPeer{}
 	testProcedure := URI("turnpike.test.endpoint")
 	msg := &Register{Request: 123, Procedure: testProcedure}
-	dealer.Register(callee, msg)
+	sess := &Session{Peer: callee}
+	dealer.Register(sess, msg)
 	reg := callee.received.(*Registered).Registration
 
 	Convey("Unregistering a procedure", t, func() {
 		msg := &Unregister{Request: 124, Registration: reg}
-		dealer.Unregister(callee, msg)
+		dealer.Unregister(sess, msg)
 
 		Convey("The callee should have received an UNREGISTERED message", func() {
 			unreg := callee.received.(*Unregistered).Request
@@ -57,9 +59,9 @@ func TestUnregister(t *testing.T) {
 		})
 
 		Convey("The dealer should no longer have the endpoint registered", func() {
-			_, ok := dealer.registrations[testProcedure]
+			_, ok := dealer.procedures[testProcedure]
 			So(ok, ShouldBeFalse)
-			_, ok = dealer.procedures[reg]
+			_, ok = dealer.registrations[reg]
 			So(ok, ShouldBeFalse)
 		})
 	})
@@ -68,15 +70,17 @@ func TestUnregister(t *testing.T) {
 func TestCall(t *testing.T) {
 	Convey("With a procedure registered", t, func() {
 		dealer := NewDefaultDealer().(*defaultDealer)
-		callee := &TestSender{}
+		callee := &TestPeer{}
 		testProcedure := URI("turnpike.test.endpoint")
 		msg := &Register{Request: 123, Procedure: testProcedure}
-		dealer.Register(callee, msg)
-		caller := &TestSender{}
+		sess := &Session{Peer: callee}
+		dealer.Register(sess, msg)
+		caller := &TestPeer{}
+		callerSession := &Session{Peer: caller}
 
 		Convey("Calling an invalid procedure", func() {
 			msg := &Call{Request: 124, Procedure: URI("turnpike.test.bad")}
-			dealer.Call(caller, msg)
+			dealer.Call(callerSession, msg)
 
 			Convey("The caller should have received an ERROR message", func() {
 				err := caller.received.(*Error)
@@ -87,7 +91,7 @@ func TestCall(t *testing.T) {
 
 		Convey("Calling a valid procedure", func() {
 			msg := &Call{Request: 125, Procedure: testProcedure}
-			dealer.Call(caller, msg)
+			dealer.Call(callerSession, msg)
 
 			Convey("The callee should have received an INVOCATION message", func() {
 				So(callee.received.MessageType(), ShouldEqual, INVOCATION)
@@ -95,7 +99,7 @@ func TestCall(t *testing.T) {
 
 				Convey("And the callee responds with a YIELD message", func() {
 					msg := &Yield{Request: inv.Request}
-					dealer.Yield(callee, msg)
+					dealer.Yield(sess, msg)
 
 					Convey("The caller should have received a RESULT message", func() {
 						So(caller.received.MessageType(), ShouldEqual, RESULT)
@@ -105,7 +109,7 @@ func TestCall(t *testing.T) {
 
 				Convey("And the callee responds with an ERROR message", func() {
 					msg := &Error{Request: inv.Request}
-					dealer.Error(callee, msg)
+					dealer.Error(sess, msg)
 
 					Convey("The caller should have received an ERROR message", func() {
 						So(caller.received.MessageType(), ShouldEqual, ERROR)
@@ -120,24 +124,23 @@ func TestCall(t *testing.T) {
 func TestRemovePeer(t *testing.T) {
 	Convey("With a procedure registered", t, func() {
 		dealer := NewDefaultDealer().(*defaultDealer)
-		callee := &TestSender{}
+		callee := &TestPeer{}
 		testProcedure := URI("turnpike.test.endpoint")
 		msg := &Register{Request: 123, Procedure: testProcedure}
-		dealer.Register(callee, msg)
+		sess := &Session{Peer: callee}
+		dealer.Register(sess, msg)
 		reg := callee.received.(*Registered).Registration
-		So(dealer.registrations, ShouldContainKey, testProcedure)
-		So(dealer.procedures, ShouldContainKey, reg)
-		So(dealer.callees[callee], ShouldContainKey, reg)
+		So(dealer.procedures, ShouldContainKey, testProcedure)
+		So(dealer.registrations, ShouldContainKey, reg)
 
 		Convey("Calling RemoveSession should remove the registration", func() {
-			dealer.RemovePeer(callee)
+			dealer.RemoveSession(sess)
 			So(dealer.registrations, ShouldNotContainKey, testProcedure)
 			So(dealer.procedures, ShouldNotContainKey, reg)
-			So(dealer.callees[callee], ShouldNotContainKey, reg)
 
 			Convey("And registering the endpoint again should succeed", func() {
 				msg.Request = 124
-				dealer.Register(callee, msg)
+				dealer.Register(sess, msg)
 				So(callee.received.MessageType(), ShouldEqual, REGISTERED)
 			})
 		})
diff --git a/interceptor.go b/interceptor.go
index 52bae31..107510e 100644
--- a/interceptor.go
+++ b/interceptor.go
@@ -6,7 +6,7 @@ package turnpike
 // Intercept takes the session and (a pointer to) the message, and (possibly)
 // modifies the message.
 type Interceptor interface {
-	Intercept(session Session, msg *Message)
+	Intercept(session *Session, msg *Message)
 }
 
 // DefaultInterceptor does nothing :)
@@ -18,5 +18,5 @@ func NewDefaultInterceptor() Interceptor {
 	return &defaultInterceptor{}
 }
 
-func (di *defaultInterceptor) Intercept(session Session, msg *Message) {
+func (di *defaultInterceptor) Intercept(session *Session, msg *Message) {
 }
diff --git a/realm.go b/realm.go
index 0eed0c6..12f00c4 100644
--- a/realm.go
+++ b/realm.go
@@ -24,7 +24,7 @@ type Realm struct {
 	Authenticators   map[string]Authenticator
 	// DefaultAuth      func(details map[string]interface{}) (map[string]interface{}, error)
 	AuthTimeout time.Duration
-	clients     map[ID]Session
+	clients     map[ID]*Session
 	localClient
 }
 
@@ -34,7 +34,7 @@ type localClient struct {
 
 func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) {
 	peerA, peerB := localPipe()
-	sess := Session{Peer: peerA, Id: NewID(), Details: details, kill: make(chan URI, 1)}
+	sess := &Session{Peer: peerA, Id: NewID(), Details: details, kill: make(chan URI, 1)}
 	if details == nil {
 		details = make(map[string]interface{})
 	}
@@ -51,7 +51,7 @@ func (r Realm) Close() {
 }
 
 func (r *Realm) init() {
-	r.clients = make(map[ID]Session)
+	r.clients = make(map[ID]*Session)
 	p, _ := r.getPeer(nil)
 	r.localClient.Client = NewClient(p)
 	if r.Broker == nil {
@@ -82,12 +82,12 @@ func (l *localClient) onLeave(session ID) {
 	l.Publish("wamp.session.on_leave", []interface{}{session}, nil)
 }
 
-func (r *Realm) handleSession(sess Session) {
+func (r *Realm) handleSession(sess *Session) {
 	r.clients[sess.Id] = sess
 	r.onJoin(sess.Details)
 	defer func() {
 		delete(r.clients, sess.Id)
-		r.Dealer.RemovePeer(sess.Peer)
+		r.Dealer.RemoveSession(sess)
 		r.onLeave(sess.Id)
 	}()
 	c := sess.Receive()
@@ -133,27 +133,27 @@ func (r *Realm) handleSession(sess Session) {
 
 		// Broker messages
 		case *Publish:
-			r.Broker.Publish(sess.Peer, msg)
+			r.Broker.Publish(sess, msg)
 		case *Subscribe:
-			r.Broker.Subscribe(sess.Peer, msg)
+			r.Broker.Subscribe(sess, msg)
 		case *Unsubscribe:
-			r.Broker.Unsubscribe(sess.Peer, msg)
+			r.Broker.Unsubscribe(sess, msg)
 
 		// Dealer messages
 		case *Register:
-			r.Dealer.Register(sess.Peer, msg)
+			r.Dealer.Register(sess, msg)
 		case *Unregister:
-			r.Dealer.Unregister(sess.Peer, msg)
+			r.Dealer.Unregister(sess, msg)
 		case *Call:
-			r.Dealer.Call(sess.Peer, msg)
+			r.Dealer.Call(sess, msg)
 		case *Yield:
-			r.Dealer.Yield(sess.Peer, msg)
+			r.Dealer.Yield(sess, msg)
 
 		// Error messages
 		case *Error:
 			if msg.Type == INVOCATION {
 				// the only type of ERROR message the router should receive
-				r.Dealer.Error(sess.Peer, msg)
+				r.Dealer.Error(sess, msg)
 			} else {
 				log.Printf("invalid ERROR message received: %v", msg)
 			}
diff --git a/router.go b/router.go
index 9eddec3..ee06c50 100644
--- a/router.go
+++ b/router.go
@@ -149,7 +149,7 @@ func (r *defaultRouter) Accept(client Peer) error {
 	// session details
 	welcome.Details["session"] = welcome.Id
 	welcome.Details["realm"] = hello.Realm
-	sess := Session{
+	sess := &Session{
 		Peer:    client,
 		Id:      welcome.Id,
 		Details: welcome.Details,
diff --git a/session.go b/session.go
index 42564c4..a9a0237 100644
--- a/session.go
+++ b/session.go
@@ -4,19 +4,33 @@ import (
 	"fmt"
 )
 
+const (
+	MAX_REQUEST_ID = 1 << 53
+)
+
 // Session represents an active WAMP session
 type Session struct {
 	Peer
 	Id      ID
 	Details map[string]interface{}
 
-	kill chan URI
+	lastRequestId ID
+	kill          chan URI
 }
 
 func (s Session) String() string {
 	return fmt.Sprintf("%d", s.Id)
 }
 
+func (s *Session) NextRequestId() ID {
+	s.lastRequestId++
+	// max value is 2^53
+	if s.lastRequestId > MAX_REQUEST_ID {
+		s.lastRequestId = 1
+	}
+	return s.lastRequestId
+}
+
 // localPipe creates two linked sessions. Messages sent to one will
 // appear in the Receive of the other. This is useful for implementing
 // client sessions
diff --git a/session_test.go b/session_test.go
new file mode 100644
index 0000000..9518a14
--- /dev/null
+++ b/session_test.go
@@ -0,0 +1,21 @@
+package turnpike
+
+import (
+	"testing"
+
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func TestNextRequestId(t *testing.T) {
+	Convey("Incrementing session id", t, func() {
+		sess := &Session{}
+		Convey("Should increment on subsequent calls", func() {
+			So(sess.NextRequestId(), ShouldEqual, 1)
+			So(sess.NextRequestId(), ShouldEqual, 2)
+		})
+		Convey("Should roll over upon reaching the max session id size", func() {
+			sess.lastRequestId = MAX_REQUEST_ID
+			So(sess.NextRequestId(), ShouldEqual, 1)
+		})
+	})
+}

From 4621c9349b1cd4400c0ce8acc9d1b5f8ab1e0fd6 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Tue, 12 Jan 2016 15:39:35 -0700
Subject: [PATCH 02/25] Fix websocket send concurrency issue

- fixes a problem when running with AutobahnPython and twisted
---
 websocket.go | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/websocket.go b/websocket.go
index b34d908..b66088c 100644
--- a/websocket.go
+++ b/websocket.go
@@ -2,6 +2,7 @@ package turnpike
 
 import (
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -13,6 +14,7 @@ type websocketPeer struct {
 	messages    chan Message
 	payloadType int
 	closed      bool
+	mutex       sync.Mutex
 }
 
 // NewWebsocketPeer connects to the websocket server at the specified url.
@@ -56,6 +58,8 @@ func (ep *websocketPeer) Send(msg Message) error {
 	if err != nil {
 		return err
 	}
+	ep.mutex.Lock()
+	defer ep.mutex.Unlock()
 	return ep.conn.WriteMessage(ep.payloadType, b)
 }
 func (ep *websocketPeer) Receive() <-chan Message {

From 695927032d29e2855b06c2d6582165dc5b9c06a0 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Wed, 13 Jan 2016 11:48:26 -0700
Subject: [PATCH 03/25] Added mutex to invocation map

- prevent data race; maps aren't safe for concurrent use
---
 dealer.go | 35 +++++++++++++++++++++++++++++++----
 1 file changed, 31 insertions(+), 4 deletions(-)

diff --git a/dealer.go b/dealer.go
index d60c391..4e39938 100644
--- a/dealer.go
+++ b/dealer.go
@@ -1,5 +1,9 @@
 package turnpike
 
+import (
+	"sync"
+)
+
 // A Dealer routes and manages RPC calls to callees.
 type Dealer interface {
 	// Register a procedure on an endpoint
@@ -38,6 +42,10 @@ type defaultDealer struct {
 	// link the invocation ID to the call ID
 	invocations map[*Session]map[ID]rpcRequest
 	// callees map[*Session]map[ID]bool
+
+	// single lock for all invocations; could use RWLock, but in most (all?) cases we want a write lock
+	// TODO: add the lock per session
+	invocationLock sync.Mutex
 }
 
 // NewDefaultDealer returns the default turnpike dealer implementation
@@ -95,6 +103,9 @@ func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) {
 }
 
 func (d *defaultDealer) Call(sess *Session, msg *Call) {
+	d.invocationLock.Lock()
+	defer d.invocationLock.Unlock()
+
 	if rproc, ok := d.procedures[msg.Procedure]; !ok {
 		sess.Peer.Send(&Error{
 			Type:    msg.MessageType(),
@@ -125,6 +136,9 @@ func (d *defaultDealer) Call(sess *Session, msg *Call) {
 }
 
 func (d *defaultDealer) Yield(sess *Session, msg *Yield) {
+	d.invocationLock.Lock()
+	defer d.invocationLock.Unlock()
+
 	if d.invocations[sess] == nil {
 		log.Println("received YIELD message from unknown session:", sess.Id)
 		return
@@ -133,10 +147,11 @@ func (d *defaultDealer) Yield(sess *Session, msg *Yield) {
 		// WAMP spec doesn't allow sending an error in response to a YIELD message
 		log.Println("received YIELD message with invalid invocation request ID:", msg.Request)
 	} else {
-		// TODO: delete old keys; could do here, but should probably lock the map
+		// delete old keys
 		delete(d.invocations[sess], msg.Request)
+
 		// return the result to the caller
-		call.caller.Send(&Result{
+		go call.caller.Send(&Result{
 			Request:     call.requestId,
 			Details:     map[string]interface{}{},
 			Arguments:   msg.Arguments,
@@ -144,19 +159,27 @@ func (d *defaultDealer) Yield(sess *Session, msg *Yield) {
 		})
 		log.Printf("returned YIELD %v to caller as RESULT %v", msg.Request, call.requestId)
 	}
+
+	if len(d.invocations[sess]) == 0 {
+		delete(d.invocations, sess)
+	}
 }
 
 func (d *defaultDealer) Error(sess *Session, msg *Error) {
+	d.invocationLock.Lock()
+	defer d.invocationLock.Unlock()
+
 	if d.invocations[sess] == nil {
-		log.Println("received YIELD message from unknown session:", sess.Id)
+		log.Println("received ERROR message from unknown session:", sess.Id)
 		return
 	}
 	if call, ok := d.invocations[sess][msg.Request]; !ok {
 		log.Println("received ERROR (INVOCATION) message with invalid invocation request ID:", msg.Request)
 	} else {
 		delete(d.invocations[sess], msg.Request)
+
 		// return an error to the caller
-		call.caller.Peer.Send(&Error{
+		go call.caller.Peer.Send(&Error{
 			Type:        CALL,
 			Request:     call.requestId,
 			Details:     make(map[string]interface{}),
@@ -165,6 +188,10 @@ func (d *defaultDealer) Error(sess *Session, msg *Error) {
 		})
 		log.Printf("returned ERROR %v to caller as ERROR %v", msg.Request, call.requestId)
 	}
+
+	if len(d.invocations[sess]) == 0 {
+		delete(d.invocations, sess)
+	}
 }
 
 func (d *defaultDealer) RemoveSession(sess *Session) {

From 1f80efd85a11943c37d4d592cb8be80315785ed3 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Wed, 13 Jan 2016 16:27:38 -0700
Subject: [PATCH 04/25] Try increasing buffers

---
 session.go          | 4 ++--
 websocket.go        | 2 +-
 websocket_server.go | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/session.go b/session.go
index a9a0237..0912bce 100644
--- a/session.go
+++ b/session.go
@@ -35,8 +35,8 @@ func (s *Session) NextRequestId() ID {
 // appear in the Receive of the other. This is useful for implementing
 // client sessions
 func localPipe() (*localPeer, *localPeer) {
-	aToB := make(chan Message, 10)
-	bToA := make(chan Message, 10)
+	aToB := make(chan Message, 100)
+	bToA := make(chan Message, 100)
 
 	a := &localPeer{
 		incoming: bToA,
diff --git a/websocket.go b/websocket.go
index b66088c..f78c986 100644
--- a/websocket.go
+++ b/websocket.go
@@ -43,7 +43,7 @@ func newWebsocketPeer(url, protocol, origin string, serializer Serializer, paylo
 	}
 	ep := &websocketPeer{
 		conn:        conn,
-		messages:    make(chan Message, 10),
+		messages:    make(chan Message, 100),
 		serializer:  serializer,
 		payloadType: payloadType,
 	}
diff --git a/websocket_server.go b/websocket_server.go
index 14849a9..c04414c 100644
--- a/websocket_server.go
+++ b/websocket_server.go
@@ -137,7 +137,7 @@ func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) {
 	peer := websocketPeer{
 		conn:        conn,
 		serializer:  serializer,
-		messages:    make(chan Message, 10),
+		messages:    make(chan Message, 100),
 		payloadType: payloadType,
 	}
 	go peer.run()

From 7731912e098a4338cc96aa630c0fb6cd4ee909eb Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 21 Jan 2016 16:16:50 -0700
Subject: [PATCH 05/25] Add error URI to errors in RPC commands

---
 dealer.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/dealer.go b/dealer.go
index 4e39938..dcfc81b 100644
--- a/dealer.go
+++ b/dealer.go
@@ -182,6 +182,7 @@ func (d *defaultDealer) Error(sess *Session, msg *Error) {
 		go call.caller.Peer.Send(&Error{
 			Type:        CALL,
 			Request:     call.requestId,
+			Error:       msg.Error,
 			Details:     make(map[string]interface{}),
 			Arguments:   msg.Arguments,
 			ArgumentsKw: msg.ArgumentsKw,

From dd3c64aca27d322c9cd8983c45321947c5d60e6a Mon Sep 17 00:00:00 2001
From: JakobGreen <jakeg@spotterrf.com>
Date: Fri, 5 Feb 2016 14:44:19 -0700
Subject: [PATCH 06/25] Fixed error reporting on RPC calls

---
 client.go | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/client.go b/client.go
index e59fb0b..ff23500 100644
--- a/client.go
+++ b/client.go
@@ -482,6 +482,15 @@ func (c *Client) Publish(topic string, args []interface{}, kwargs map[string]int
 	})
 }
 
+type RPCError struct {
+	ErrorMessage *Error
+	Procedure    string
+}
+
+func (rpc RPCError) Error() string {
+	return fmt.Sprintf("error calling procedure '%v': %v: %v: %v", rpc.Procedure, rpc.ErrorMessage.Error, rpc.ErrorMessage.Arguments, rpc.ErrorMessage.ArgumentsKw)
+}
+
 // Call calls a procedure given a URI.
 func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]interface{}) (*Result, error) {
 	id := NewID()
@@ -503,7 +512,7 @@ func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]in
 	if err != nil {
 		return nil, err
 	} else if e, ok := msg.(*Error); ok {
-		return nil, fmt.Errorf("error calling procedure '%v': %v", procedure, e.Error)
+		return nil, RPCError{e, procedure}
 	} else if result, ok := msg.(*Result); !ok {
 		return nil, fmt.Errorf(formatUnexpectedMessage(msg, RESULT))
 	} else {

From 96f011e1c1fc3d75d7d3df0fb22218bd0cfae35a Mon Sep 17 00:00:00 2001
From: gngeorgiev <gngeorgiev.it@gmail.com>
Date: Sat, 26 Mar 2016 01:39:33 +0200
Subject: [PATCH 07/25] fixed concurrent writes to map in realm

---
 dealer_test.go |  9 +++++++++
 realm.go       | 25 ++++++++++++++++---------
 2 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/dealer_test.go b/dealer_test.go
index b862459..49cb054 100644
--- a/dealer_test.go
+++ b/dealer_test.go
@@ -2,6 +2,7 @@ package turnpike
 
 import (
 	"testing"
+	"time"
 
 	. "github.com/smartystreets/goconvey/convey"
 )
@@ -101,7 +102,11 @@ func TestCall(t *testing.T) {
 					msg := &Yield{Request: inv.Request}
 					dealer.Yield(sess, msg)
 
+					// give it some time to propagate
+					time.Sleep(time.Millisecond)
+
 					Convey("The caller should have received a RESULT message", func() {
+						So(caller.received, ShouldNotBeNil)
 						So(caller.received.MessageType(), ShouldEqual, RESULT)
 						So(caller.received.(*Result).Request, ShouldEqual, 125)
 					})
@@ -111,7 +116,11 @@ func TestCall(t *testing.T) {
 					msg := &Error{Request: inv.Request}
 					dealer.Error(sess, msg)
 
+					// give it some time to propagate
+					time.Sleep(time.Millisecond)
+
 					Convey("The caller should have received an ERROR message", func() {
+						So(caller.received, ShouldNotBeNil)
 						So(caller.received.MessageType(), ShouldEqual, ERROR)
 						So(caller.received.(*Error).Request, ShouldEqual, 125)
 					})
diff --git a/realm.go b/realm.go
index 12f00c4..433686b 100644
--- a/realm.go
+++ b/realm.go
@@ -3,6 +3,8 @@ package turnpike
 import (
 	"fmt"
 	"time"
+
+	"github.com/streamrail/concurrent-map"
 )
 
 const (
@@ -24,7 +26,7 @@ type Realm struct {
 	Authenticators   map[string]Authenticator
 	// DefaultAuth      func(details map[string]interface{}) (map[string]interface{}, error)
 	AuthTimeout time.Duration
-	clients     map[ID]*Session
+	clients     cmap.ConcurrentMap
 	localClient
 }
 
@@ -45,13 +47,18 @@ func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) {
 
 // Close disconnects all clients after sending a goodbye message
 func (r Realm) Close() {
-	for _, client := range r.clients {
-		client.kill <- ErrSystemShutdown
+	iter := r.clients.Iter()
+	for client := range iter {
+		sess, isSession := client.Val.(*Session)
+		if !isSession {
+			continue
+		}
+		sess.kill <- ErrSystemShutdown
 	}
 }
 
 func (r *Realm) init() {
-	r.clients = make(map[ID]*Session)
+	r.clients = cmap.New()
 	p, _ := r.getPeer(nil)
 	r.localClient.Client = NewClient(p)
 	if r.Broker == nil {
@@ -83,10 +90,10 @@ func (l *localClient) onLeave(session ID) {
 }
 
 func (r *Realm) handleSession(sess *Session) {
-	r.clients[sess.Id] = sess
+	r.clients.Set(string(sess.Id), sess)
 	r.onJoin(sess.Details)
 	defer func() {
-		delete(r.clients, sess.Id)
+		r.clients.Remove(string(sess.Id))
 		r.Dealer.RemoveSession(sess)
 		r.onLeave(sess.Id)
 	}()
@@ -255,9 +262,9 @@ func addAuthMethod(details map[string]interface{}, method string) map[string]int
 }
 
 // r := Realm{
-// 	Authenticators: map[string]turnpike.Authenticator{
-// 		"wampcra": turnpike.NewCRAAuthenticatorFactoryFactory(mySecret),
-// 		"ticket": turnpike.NewTicketAuthenticator(myTicket),
+// 	Authenticators: map[string]gowamp.Authenticator{
+// 		"wampcra": gowamp.NewCRAAuthenticatorFactoryFactory(mySecret),
+// 		"ticket": gowamp.NewTicketAuthenticator(myTicket),
 // 		"asdfasdf": myAsdfAuthenticator,
 // 	},
 // 	BasicAuthenticators: map[string]turnpike.BasicAuthenticator{

From 742b9c3bef3c0bf76b26cbad78c3bf299690971d Mon Sep 17 00:00:00 2001
From: gngeorgiev <gngeorgiev.it@gmail.com>
Date: Sat, 26 Mar 2016 01:30:06 +0200
Subject: [PATCH 08/25] added exclude_me support

---
 broker.go | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/broker.go b/broker.go
index 3c864d3..9060132 100644
--- a/broker.go
+++ b/broker.go
@@ -39,12 +39,18 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
 		ArgumentsKw: msg.ArgumentsKw,
 		Details:     make(map[string]interface{}),
 	}
+
+	excludePublisher := true
+	if exclude, ok := msg.Options["exclude_me"].(bool); ok {
+		excludePublisher = exclude
+	}
+
 	for id, sub := range br.routes[msg.Topic] {
 		// shallow-copy the template
 		event := evtTemplate
 		event.Subscription = id
 		// don't send event to publisher
-		if sub != pub {
+		if sub != pub || !excludePublisher {
 			sub.Send(&event)
 		}
 	}

From bf55c97447e62b9d7c32846451f70e60088a9c4b Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 31 Mar 2016 14:11:26 -0600
Subject: [PATCH 09/25] Fixed data races

---
 broker.go      |  15 ++++
 broker_test.go |  13 ++++
 data_races.txt |  44 ++++++++++++
 dealer.go      |  14 ++++
 dealer_test.go |  14 ++--
 realm.go       | 188 ++++++++++++++++++++++++++++---------------------
 router.go      |  38 ++++++----
 7 files changed, 228 insertions(+), 98 deletions(-)
 create mode 100644 data_races.txt

diff --git a/broker.go b/broker.go
index 9060132..b28827c 100644
--- a/broker.go
+++ b/broker.go
@@ -1,5 +1,9 @@
 package turnpike
 
+import (
+	"sync"
+)
+
 // Broker is the interface implemented by an object that handles routing EVENTS
 // from Publishers to Subscribers.
 type Broker interface {
@@ -15,6 +19,8 @@ type Broker interface {
 type defaultBroker struct {
 	routes        map[URI]map[ID]Sender
 	subscriptions map[ID]URI
+
+	sync.RWMutex
 }
 
 // NewDefaultBroker initializes and returns a simple broker that matches URIs to
@@ -31,6 +37,9 @@ func NewDefaultBroker() Broker {
 // If msg.Options["acknowledge"] == true, the publisher receives a Published event
 // after the message has been sent to all subscribers.
 func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
+	br.RLock()
+	defer br.RUnlock()
+
 	pub := sess.Peer
 	pubID := sess.NextRequestId()
 	evtTemplate := Event{
@@ -63,6 +72,9 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
 
 // Subscribe subscribes the client to the given topic.
 func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
+	br.Lock()
+	defer br.Unlock()
+
 	if _, ok := br.routes[msg.Topic]; !ok {
 		br.routes[msg.Topic] = make(map[ID]Sender)
 	}
@@ -75,6 +87,9 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
 }
 
 func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) {
+	br.Lock()
+	defer br.Unlock()
+
 	topic, ok := br.subscriptions[msg.Subscription]
 	if !ok {
 		err := &Error{
diff --git a/broker_test.go b/broker_test.go
index d53807f..d428ee9 100644
--- a/broker_test.go
+++ b/broker_test.go
@@ -1,6 +1,7 @@
 package turnpike
 
 import (
+	"sync"
 	"testing"
 
 	. "github.com/smartystreets/goconvey/convey"
@@ -9,13 +10,25 @@ import (
 type TestPeer struct {
 	received Message
 	sent     Message
+
+	sync.RWMutex
 }
 
 func (s *TestPeer) Send(msg Message) error {
+	s.Lock()
+	defer s.Unlock()
+
 	s.received = msg
 	return nil
 }
 
+func (s *TestPeer) getReceived() Message {
+	s.RLock()
+	defer s.RUnlock()
+
+	return s.received
+}
+
 // TODO: implement me
 func (s *TestPeer) Receive() <-chan Message { return nil }
 func (s *TestPeer) Close() error            { return nil }
diff --git a/data_races.txt b/data_races.txt
new file mode 100644
index 0000000..d24dd8f
--- /dev/null
+++ b/data_races.txt
@@ -0,0 +1,44 @@
+...
+3 total assertions
+
+...
+6 total assertions
+
+.
+7 total assertions
+
+.
+8 total assertions
+
+...............
+23 total assertions
+
+.......
+30 total assertions
+
+...
+33 total assertions
+
+..........
+43 total assertions
+
+.....
+48 total assertions
+
+..
+50 total assertions
+
+.....
+55 total assertions
+
+......
+61 total assertions
+
+.....
+66 total assertions
+
+...
+69 total assertions
+
+PASS
+ok  	gopkg.in/beatgammit/turnpike.v2	1.045s
diff --git a/dealer.go b/dealer.go
index dcfc81b..3758714 100644
--- a/dealer.go
+++ b/dealer.go
@@ -46,6 +46,8 @@ type defaultDealer struct {
 	// single lock for all invocations; could use RWLock, but in most (all?) cases we want a write lock
 	// TODO: add the lock per session
 	invocationLock sync.Mutex
+
+	sync.RWMutex
 }
 
 // NewDefaultDealer returns the default turnpike dealer implementation
@@ -58,6 +60,9 @@ func NewDefaultDealer() Dealer {
 }
 
 func (d *defaultDealer) Register(sess *Session, msg *Register) {
+	d.Lock()
+	defer d.Unlock()
+
 	if id, ok := d.procedures[msg.Procedure]; ok {
 		log.Println("error: procedure already exists:", msg.Procedure, id)
 		sess.Peer.Send(&Error{
@@ -82,6 +87,9 @@ func (d *defaultDealer) Register(sess *Session, msg *Register) {
 }
 
 func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) {
+	d.Lock()
+	defer d.Unlock()
+
 	if procedure, ok := d.registrations[msg.Registration]; !ok {
 		// the registration doesn't exist
 		log.Println("error: no such registration:", msg.Registration)
@@ -103,6 +111,9 @@ func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) {
 }
 
 func (d *defaultDealer) Call(sess *Session, msg *Call) {
+	d.Lock()
+	defer d.Unlock()
+
 	d.invocationLock.Lock()
 	defer d.invocationLock.Unlock()
 
@@ -136,6 +147,9 @@ func (d *defaultDealer) Call(sess *Session, msg *Call) {
 }
 
 func (d *defaultDealer) Yield(sess *Session, msg *Yield) {
+	d.Lock()
+	defer d.Unlock()
+
 	d.invocationLock.Lock()
 	defer d.invocationLock.Unlock()
 
diff --git a/dealer_test.go b/dealer_test.go
index 49cb054..97fcaf8 100644
--- a/dealer_test.go
+++ b/dealer_test.go
@@ -84,7 +84,7 @@ func TestCall(t *testing.T) {
 			dealer.Call(callerSession, msg)
 
 			Convey("The caller should have received an ERROR message", func() {
-				err := caller.received.(*Error)
+				err := caller.getReceived().(*Error)
 				So(err.Error, ShouldEqual, ErrNoSuchProcedure)
 				So(err.Details, ShouldNotBeNil)
 			})
@@ -106,9 +106,9 @@ func TestCall(t *testing.T) {
 					time.Sleep(time.Millisecond)
 
 					Convey("The caller should have received a RESULT message", func() {
-						So(caller.received, ShouldNotBeNil)
-						So(caller.received.MessageType(), ShouldEqual, RESULT)
-						So(caller.received.(*Result).Request, ShouldEqual, 125)
+						So(caller.getReceived(), ShouldNotBeNil)
+						So(caller.getReceived().MessageType(), ShouldEqual, RESULT)
+						So(caller.getReceived().(*Result).Request, ShouldEqual, 125)
 					})
 				})
 
@@ -120,9 +120,9 @@ func TestCall(t *testing.T) {
 					time.Sleep(time.Millisecond)
 
 					Convey("The caller should have received an ERROR message", func() {
-						So(caller.received, ShouldNotBeNil)
-						So(caller.received.MessageType(), ShouldEqual, ERROR)
-						So(caller.received.(*Error).Request, ShouldEqual, 125)
+						So(caller.getReceived(), ShouldNotBeNil)
+						So(caller.getReceived().MessageType(), ShouldEqual, ERROR)
+						So(caller.getReceived().(*Error).Request, ShouldEqual, 125)
 					})
 				})
 			})
diff --git a/realm.go b/realm.go
index 433686b..3f48b2f 100644
--- a/realm.go
+++ b/realm.go
@@ -2,6 +2,7 @@ package turnpike
 
 import (
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/streamrail/concurrent-map"
@@ -16,22 +17,25 @@ const (
 // Clients that have connected to a WAMP router are joined to a realm and all
 // message delivery is handled by the realm.
 type Realm struct {
-	_   string
-	URI URI
-	Broker
-	Dealer
-	Authorizer
-	Interceptor
+	_                string
+	URI              URI
+	Broker           Broker
+	Dealer           Dealer
+	Authorizer       Authorizer
+	Interceptor      Interceptor
 	CRAuthenticators map[string]CRAuthenticator
 	Authenticators   map[string]Authenticator
 	// DefaultAuth      func(details map[string]interface{}) (map[string]interface{}, error)
 	AuthTimeout time.Duration
 	clients     cmap.ConcurrentMap
-	localClient
+	localClient *localClient
+
+	lock sync.RWMutex
 }
 
 type localClient struct {
 	*Client
+	sync.Mutex
 }
 
 func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) {
@@ -58,9 +62,18 @@ func (r Realm) Close() {
 }
 
 func (r *Realm) init() {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+
 	r.clients = cmap.New()
-	p, _ := r.getPeer(nil)
-	r.localClient.Client = NewClient(p)
+
+	if r.localClient == nil {
+		p, _ := r.getPeer(nil)
+		client := NewClient(p)
+		r.localClient = new(localClient)
+		r.localClient.Client = client
+	}
+
 	if r.Broker == nil {
 		r.Broker = NewDefaultBroker()
 	}
@@ -82,92 +95,109 @@ func (r *Realm) init() {
 // }
 
 func (l *localClient) onJoin(details map[string]interface{}) {
+	l.Lock()
+	defer l.Unlock()
 	l.Publish("wamp.session.on_join", []interface{}{details}, nil)
 }
 
 func (l *localClient) onLeave(session ID) {
+	l.Lock()
+	defer l.Unlock()
 	l.Publish("wamp.session.on_leave", []interface{}{session}, nil)
 }
 
+func (r *Realm) doOne(c <-chan Message, sess *Session) bool {
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+
+	var msg Message
+	var open bool
+	select {
+	case msg, open = <-c:
+		if !open {
+			log.Println("lost session:", sess)
+			return false
+		}
+	case reason := <-sess.kill:
+		logErr(sess.Send(&Goodbye{Reason: reason, Details: make(map[string]interface{})}))
+		log.Printf("kill session %s: %v", sess, reason)
+		// TODO: wait for client Goodbye?
+		return false
+	}
+
+	log.Printf("[%s] %s: %+v", sess, msg.MessageType(), msg)
+	if isAuthz, err := r.Authorizer.Authorize(sess, msg); !isAuthz {
+		errMsg := &Error{Type: msg.MessageType()}
+		if err != nil {
+			errMsg.Error = ErrAuthorizationFailed
+			log.Printf("[%s] authorization failed: %v", sess, err)
+		} else {
+			errMsg.Error = ErrNotAuthorized
+			log.Printf("[%s] %s UNAUTHORIZED", sess, msg.MessageType())
+		}
+		logErr(sess.Send(errMsg))
+		return true
+	}
+
+	r.Interceptor.Intercept(sess, &msg)
+
+	switch msg := msg.(type) {
+	case *Goodbye:
+		logErr(sess.Send(&Goodbye{Reason: ErrGoodbyeAndOut, Details: make(map[string]interface{})}))
+		log.Printf("[%s] leaving: %v", sess, msg.Reason)
+		return false
+
+	// Broker messages
+	case *Publish:
+		r.Broker.Publish(sess, msg)
+	case *Subscribe:
+		r.Broker.Subscribe(sess, msg)
+	case *Unsubscribe:
+		r.Broker.Unsubscribe(sess, msg)
+
+	// Dealer messages
+	case *Register:
+		r.Dealer.Register(sess, msg)
+	case *Unregister:
+		r.Dealer.Unregister(sess, msg)
+	case *Call:
+		r.Dealer.Call(sess, msg)
+	case *Yield:
+		r.Dealer.Yield(sess, msg)
+
+	// Error messages
+	case *Error:
+		if msg.Type == INVOCATION {
+			// the only type of ERROR message the router should receive
+			r.Dealer.Error(sess, msg)
+		} else {
+			log.Printf("invalid ERROR message received: %v", msg)
+		}
+
+	default:
+		log.Println("Unhandled message:", msg.MessageType())
+	}
+	return true
+}
+
 func (r *Realm) handleSession(sess *Session) {
+	r.lock.RLock()
 	r.clients.Set(string(sess.Id), sess)
-	r.onJoin(sess.Details)
+	r.localClient.onJoin(sess.Details)
+	r.lock.RUnlock()
+
 	defer func() {
+		r.lock.RLock()
+		defer r.lock.RUnlock()
+
 		r.clients.Remove(string(sess.Id))
 		r.Dealer.RemoveSession(sess)
-		r.onLeave(sess.Id)
+		r.localClient.onLeave(sess.Id)
 	}()
 	c := sess.Receive()
 	// TODO: what happens if the realm is closed?
 
-	for {
-		var msg Message
-		var open bool
-		select {
-		case msg, open = <-c:
-			if !open {
-				log.Println("lost session:", sess)
-				return
-			}
-		case reason := <-sess.kill:
-			logErr(sess.Send(&Goodbye{Reason: reason, Details: make(map[string]interface{})}))
-			log.Printf("kill session %s: %v", sess, reason)
-			// TODO: wait for client Goodbye?
-			return
-		}
-
-		log.Printf("[%s] %s: %+v", sess, msg.MessageType(), msg)
-		if isAuthz, err := r.Authorizer.Authorize(sess, msg); !isAuthz {
-			errMsg := &Error{Type: msg.MessageType()}
-			if err != nil {
-				errMsg.Error = ErrAuthorizationFailed
-				log.Printf("[%s] authorization failed: %v", sess, err)
-			} else {
-				errMsg.Error = ErrNotAuthorized
-				log.Printf("[%s] %s UNAUTHORIZED", sess, msg.MessageType())
-			}
-			logErr(sess.Send(errMsg))
-			continue
-		}
-
-		r.Interceptor.Intercept(sess, &msg)
-
-		switch msg := msg.(type) {
-		case *Goodbye:
-			logErr(sess.Send(&Goodbye{Reason: ErrGoodbyeAndOut, Details: make(map[string]interface{})}))
-			log.Printf("[%s] leaving: %v", sess, msg.Reason)
-			return
-
-		// Broker messages
-		case *Publish:
-			r.Broker.Publish(sess, msg)
-		case *Subscribe:
-			r.Broker.Subscribe(sess, msg)
-		case *Unsubscribe:
-			r.Broker.Unsubscribe(sess, msg)
-
-		// Dealer messages
-		case *Register:
-			r.Dealer.Register(sess, msg)
-		case *Unregister:
-			r.Dealer.Unregister(sess, msg)
-		case *Call:
-			r.Dealer.Call(sess, msg)
-		case *Yield:
-			r.Dealer.Yield(sess, msg)
-
-		// Error messages
-		case *Error:
-			if msg.Type == INVOCATION {
-				// the only type of ERROR message the router should receive
-				r.Dealer.Error(sess, msg)
-			} else {
-				log.Printf("invalid ERROR message received: %v", msg)
-			}
-
-		default:
-			log.Println("Unhandled message:", msg.MessageType())
-		}
+	for r.doOne(c, sess) {
 	}
 }
 
diff --git a/router.go b/router.go
index ee06c50..270ada5 100644
--- a/router.go
+++ b/router.go
@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"sync"
 	"time"
+
+	"github.com/streamrail/concurrent-map"
 )
 
 var defaultWelcomeDetails = map[string]interface{}{
@@ -44,7 +46,7 @@ type Router interface {
 
 // DefaultRouter is the default WAMP router implementation.
 type defaultRouter struct {
-	realms                map[URI]Realm
+	realms                cmap.ConcurrentMap
 	closing               bool
 	closeLock             sync.Mutex
 	sessionOpenCallbacks  []func(uint, string)
@@ -54,7 +56,7 @@ type defaultRouter struct {
 // NewDefaultRouter creates a very basic WAMP router.
 func NewDefaultRouter() Router {
 	return &defaultRouter{
-		realms:                make(map[URI]Realm),
+		realms:                cmap.New(),
 		sessionOpenCallbacks:  []func(uint, string){},
 		sessionCloseCallbacks: []func(uint, string){},
 	}
@@ -76,18 +78,22 @@ func (r *defaultRouter) Close() error {
 	}
 	r.closing = true
 	r.closeLock.Unlock()
-	for _, realm := range r.realms {
-		realm.Close()
+	for val := range r.realms.Iter() {
+		if realm, ok := val.Val.(*Realm); ok {
+			realm.Close()
+		} else {
+			log.Printf("defaultRouter.Close: expecting realm, found invalid type: %T", val.Val)
+		}
 	}
 	return nil
 }
 
 func (r *defaultRouter) RegisterRealm(uri URI, realm Realm) error {
-	if _, ok := r.realms[uri]; ok {
+	if _, ok := r.realms.Get(string(uri)); ok {
 		return RealmExistsError(uri)
 	}
 	realm.init()
-	r.realms[uri] = realm
+	r.realms.Set(string(uri), &realm)
 	log.Println("registered realm:", uri)
 	return nil
 }
@@ -112,8 +118,14 @@ func (r *defaultRouter) Accept(client Peer) error {
 		return fmt.Errorf("protocol violation: expected HELLO, received %s", msg.MessageType())
 	}
 
-	realm, ok := r.realms[hello.Realm]
-	if !ok {
+	var realm *Realm
+	if val, ok := r.realms.Get(string(hello.Realm)); ok {
+		if realm, ok = val.(*Realm); !ok {
+			logErr(client.Send(&Abort{Reason: ErrNoSuchRealm}))
+			logErr(client.Close())
+			return NoSuchRealmError(hello.Realm)
+		}
+	} else {
 		logErr(client.Send(&Abort{Reason: ErrNoSuchRealm}))
 		logErr(client.Close())
 		return NoSuchRealmError(hello.Realm)
@@ -170,12 +182,14 @@ func (r *defaultRouter) Accept(client Peer) error {
 
 // GetLocalPeer returns an internal peer connected to the specified realm.
 func (r *defaultRouter) GetLocalPeer(realmURI URI, details map[string]interface{}) (Peer, error) {
-	realm, ok := r.realms[realmURI]
-	if !ok {
+	if val, ok := r.realms.Get(string(realmURI)); !ok {
+		return nil, NoSuchRealmError(realmURI)
+	} else if realm, ok := val.(*Realm); !ok {
 		return nil, NoSuchRealmError(realmURI)
+	} else {
+		// TODO: session open/close callbacks?
+		return realm.getPeer(details)
 	}
-	// TODO: session open/close callbacks?
-	return realm.getPeer(details)
 }
 
 func (r *defaultRouter) getTestPeer() Peer {

From 7e3d305799ec421577b6e6e343d98f9fa7303ff9 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Mon, 1 Aug 2016 13:36:24 -0600
Subject: [PATCH 10/25] Fix problems using the wrong types of ids in pub/sub

- fixes problems with multiple subscribers to the same topic
---
 broker.go      | 15 +++++++++++++--
 broker_test.go | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/broker.go b/broker.go
index b28827c..62e92f9 100644
--- a/broker.go
+++ b/broker.go
@@ -20,6 +20,8 @@ type defaultBroker struct {
 	routes        map[URI]map[ID]Sender
 	subscriptions map[ID]URI
 
+	lastRequestId ID
+
 	sync.RWMutex
 }
 
@@ -32,6 +34,15 @@ func NewDefaultBroker() Broker {
 	}
 }
 
+func (br *defaultBroker) nextRequestId() ID {
+	br.lastRequestId++
+	if br.lastRequestId > MAX_REQUEST_ID {
+		br.lastRequestId = 1
+	}
+
+	return br.lastRequestId
+}
+
 // Publish sends a message to all subscribed clients except for the sender.
 //
 // If msg.Options["acknowledge"] == true, the publisher receives a Published event
@@ -41,7 +52,7 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
 	defer br.RUnlock()
 
 	pub := sess.Peer
-	pubID := sess.NextRequestId()
+	pubID := NewID()
 	evtTemplate := Event{
 		Publication: pubID,
 		Arguments:   msg.Arguments,
@@ -78,7 +89,7 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
 	if _, ok := br.routes[msg.Topic]; !ok {
 		br.routes[msg.Topic] = make(map[ID]Sender)
 	}
-	id := sess.NextRequestId()
+	id := br.nextRequestId()
 	br.routes[msg.Topic][id] = sess.Peer
 
 	br.subscriptions[id] = msg.Topic
diff --git a/broker_test.go b/broker_test.go
index d428ee9..aa75a83 100644
--- a/broker_test.go
+++ b/broker_test.go
@@ -53,8 +53,43 @@ func TestSubscribe(t *testing.T) {
 			So(ok, ShouldBeTrue)
 			So(topic, ShouldEqual, testTopic)
 		})
+	})
+}
+
+func TestBrokerNextRequestId(t *testing.T) {
+	Convey("nextRequestId called multiple times", t, func() {
+		broker := NewDefaultBroker().(*defaultBroker)
+		id1 := broker.nextRequestId()
+		id2 := broker.nextRequestId()
+
+		So(id1, ShouldNotEqual, id2)
+	})
+
+	Convey("nextRequestId should roll over", t, func() {
+		broker := NewDefaultBroker().(*defaultBroker)
+		broker.lastRequestId = MAX_REQUEST_ID
+		id := broker.nextRequestId()
+
+		So(id, ShouldEqual, 1)
+	})
+}
 
-		// TODO: multiple subscribe requests?
+func TestMultipleSubscribe(t *testing.T) {
+	const SUBSCRIBERS = 2
+	const TOPIC = URI("turnpike.test.topic")
+
+	Convey("Multiple subscribers to a topic", t, func() {
+		broker := NewDefaultBroker().(*defaultBroker)
+		for i := 0; i < SUBSCRIBERS; i++ {
+			subscriber := &TestPeer{}
+			sess := &Session{Peer: subscriber, Id: NewID()}
+			msg := &Subscribe{Request: sess.NextRequestId(), Topic: TOPIC}
+			broker.Subscribe(sess, msg)
+		}
+
+		Convey("There should be a map entry for each subscriber", func() {
+			So(len(broker.routes[TOPIC]), ShouldEqual, SUBSCRIBERS)
+		})
 	})
 }
 

From 405ff6929714d11c3d0aceab608fba727e397e91 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 4 Aug 2016 09:49:26 -0600
Subject: [PATCH 11/25] Change all imports to my repo

---
 examples/auth/client/client.go   | 2 +-
 examples/auth/server/server.go   | 2 +-
 examples/autobahn/server/main.go | 2 +-
 examples/chat/chatclient/main.go | 2 +-
 examples/chat/chatserver/main.go | 2 +-
 examples/meta-api/meta-client.go | 2 +-
 examples/rpc/rpc-client/main.go  | 2 +-
 examples/rpc/rpc-server/main.go  | 2 +-
 examples/web/server.go           | 2 +-
 turnpike/main.go                 | 2 +-
 10 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/examples/auth/client/client.go b/examples/auth/client/client.go
index 30a6e0a..2ae5944 100644
--- a/examples/auth/client/client.go
+++ b/examples/auth/client/client.go
@@ -9,7 +9,7 @@ import (
 	"time"
 
 	"github.com/howeyc/gopass"
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 var password []byte
diff --git a/examples/auth/server/server.go b/examples/auth/server/server.go
index 877aa4a..326a04a 100644
--- a/examples/auth/server/server.go
+++ b/examples/auth/server/server.go
@@ -9,7 +9,7 @@ import (
 	"net/http"
 
 	"github.com/satori/go.uuid"
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 // this is just an example, please don't actually use it
diff --git a/examples/autobahn/server/main.go b/examples/autobahn/server/main.go
index 8f9c40b..914f3fa 100644
--- a/examples/autobahn/server/main.go
+++ b/examples/autobahn/server/main.go
@@ -4,7 +4,7 @@ import (
 	"log"
 	"net/http"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 func main() {
diff --git a/examples/chat/chatclient/main.go b/examples/chat/chatclient/main.go
index 1395f35..760fdf2 100644
--- a/examples/chat/chatclient/main.go
+++ b/examples/chat/chatclient/main.go
@@ -4,7 +4,7 @@ import (
 	"log"
 	"os"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 const (
diff --git a/examples/chat/chatserver/main.go b/examples/chat/chatserver/main.go
index 8f9c40b..914f3fa 100644
--- a/examples/chat/chatserver/main.go
+++ b/examples/chat/chatserver/main.go
@@ -4,7 +4,7 @@ import (
 	"log"
 	"net/http"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 func main() {
diff --git a/examples/meta-api/meta-client.go b/examples/meta-api/meta-client.go
index bdd8310..0634d7c 100644
--- a/examples/meta-api/meta-client.go
+++ b/examples/meta-api/meta-client.go
@@ -3,7 +3,7 @@ package main
 import (
 	"log"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 func main() {
diff --git a/examples/rpc/rpc-client/main.go b/examples/rpc/rpc-client/main.go
index ff3516c..786c945 100644
--- a/examples/rpc/rpc-client/main.go
+++ b/examples/rpc/rpc-client/main.go
@@ -7,7 +7,7 @@ import (
 	"os"
 	"strconv"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 func main() {
diff --git a/examples/rpc/rpc-server/main.go b/examples/rpc/rpc-server/main.go
index ff428b7..0e26b74 100644
--- a/examples/rpc/rpc-server/main.go
+++ b/examples/rpc/rpc-server/main.go
@@ -5,7 +5,7 @@ import (
 	"net/http"
 	"time"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 var client *turnpike.Client
diff --git a/examples/web/server.go b/examples/web/server.go
index 0a92017..eddd74e 100644
--- a/examples/web/server.go
+++ b/examples/web/server.go
@@ -4,7 +4,7 @@ import (
 	"log"
 	"net/http"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 func main() {
diff --git a/turnpike/main.go b/turnpike/main.go
index b11fc68..58c1b26 100644
--- a/turnpike/main.go
+++ b/turnpike/main.go
@@ -8,7 +8,7 @@ import (
 	"os/signal"
 	"time"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 var (

From c88b89f52b388a5f4e5b9f2f00c472a87388419e Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 4 Aug 2016 09:53:24 -0600
Subject: [PATCH 12/25] Changed most references to old repo

---
 README.md | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/README.md b/README.md
index 80d02a7..9cdd0c5 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-Turnpike [![Build Status](https://drone.io/github.com/jcelliott/turnpike/status.png)](https://drone.io/github.com/jcelliott/turnpike/latest) [![Coverage Status](https://coveralls.io/repos/jcelliott/turnpike/badge.svg?branch=v2)](https://coveralls.io/r/jcelliott/turnpike?branch=v2) [![GoDoc](https://godoc.org/gopkg.in/jcelliott/turnpike?status.svg)](http://godoc.org/gopkg.in/jcelliott/turnpike.v2)
+Turnpike [![Build Status](https://drone.io/github.com/jcelliott/turnpike/status.png)](https://drone.io/github.com/jcelliott/turnpike/latest) [![Coverage Status](https://coveralls.io/repos/jcelliott/turnpike/badge.svg?branch=v2)](https://coveralls.io/r/jcelliott/turnpike?branch=v2) [![GoDoc](https://godoc.org/gopkg.in/beatgammit/turnpike?status.svg)](http://godoc.org/gopkg.in/beatgammit/turnpike.v2)
 ===
 
 Go implementation of [WAMP](http://wamp.ws/) - The Web Application Messaging Protocol
@@ -18,25 +18,23 @@ basic stand-alone router. The router library can be used to embed a WAMP router
 in another application, or to build a custom router implementation. The client
 library can be used to communicate with any WAMP router.
 
-This version of Turnpike supports WAMP v2. For WAMP v1 support see the [v1 branch](https://github.com/jcelliott/turnpike/tree/v1).
-
 Status
 ---
 
 Turnpike v2 is still under development, but is getting close to a stable
 release. If you have any feedback or suggestions, please
-[open an issue](https://github.com/jcelliott/turnpike/issues/new).
+[open an issue](https://github.com/beatgammit/turnpike/issues/new).
 
 Installation
 ---
 
 Library:
 
-    go get -u gopkg.in/jcelliott/turnpike.v2
+    go get -u gopkg.in/beatgammit/turnpike.v2
 
 Stand-alone router:
 
-    go get -u gopkg.in/jcelliott/turnpike.v2/turnpike
+    go get -u gopkg.in/beatgammit/turnpike.v2/turnpike
 
 Client library usage
 ---
@@ -56,7 +54,7 @@ import (
 	"log"
 	"net/http"
 
-	"gopkg.in/jcelliott/turnpike.v2"
+	"gopkg.in/beatgammit/turnpike.v2"
 )
 
 func main() {

From ae44bc4bc4a440bb3ff1c3ccfc2b801104394ee2 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 4 Aug 2016 09:55:31 -0600
Subject: [PATCH 13/25] Fix compile errors in examples

- no guarantee they still work, but this makes it easier to run tests
---
 examples/auth/client/client.go   | 9 +++++++--
 examples/chat/chatclient/main.go | 2 +-
 examples/meta-api/meta-client.go | 2 +-
 examples/rpc/rpc-client/main.go  | 2 +-
 4 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/examples/auth/client/client.go b/examples/auth/client/client.go
index 2ae5944..e102c24 100644
--- a/examples/auth/client/client.go
+++ b/examples/auth/client/client.go
@@ -29,9 +29,14 @@ func main() {
 	turnpike.Debug()
 	fmt.Println("Hint: the password is 'password'")
 	fmt.Print("Password: ")
-	password = gopass.GetPasswd()
 
-	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/ws")
+	var err error
+	password, err = gopass.GetPasswd()
+	if err != nil {
+		log.Fatal("Error getting the password:", err)
+	}
+
+	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/ws", nil)
 	if err != nil {
 		log.Fatal(err)
 	}
diff --git a/examples/chat/chatclient/main.go b/examples/chat/chatclient/main.go
index 760fdf2..fd219c4 100644
--- a/examples/chat/chatclient/main.go
+++ b/examples/chat/chatclient/main.go
@@ -36,7 +36,7 @@ func main() {
 	username := os.Args[1]
 
 	// turnpike.Debug()
-	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/")
+	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/", nil)
 	if err != nil {
 		log.Fatal(err)
 	}
diff --git a/examples/meta-api/meta-client.go b/examples/meta-api/meta-client.go
index 0634d7c..b71e513 100644
--- a/examples/meta-api/meta-client.go
+++ b/examples/meta-api/meta-client.go
@@ -7,7 +7,7 @@ import (
 )
 
 func main() {
-	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/")
+	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/", nil)
 	if err != nil {
 		log.Fatal(err)
 	}
diff --git a/examples/rpc/rpc-client/main.go b/examples/rpc/rpc-client/main.go
index 786c945..66ef4f7 100644
--- a/examples/rpc/rpc-client/main.go
+++ b/examples/rpc/rpc-client/main.go
@@ -12,7 +12,7 @@ import (
 
 func main() {
 	turnpike.Debug()
-	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/")
+	c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/", nil)
 	if err != nil {
 		log.Fatal(err)
 	}

From 4b399d494b1a71ac0cbfa034a7c5f377c1b0763f Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 4 Aug 2016 10:24:47 -0600
Subject: [PATCH 14/25] Added locks to Client

- single rw lock to make eliminate races, could definitely use some
  performance tuning/refactoring
---
 client.go | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/client.go b/client.go
index 3120f32..3052184 100644
--- a/client.go
+++ b/client.go
@@ -3,6 +3,7 @@ package turnpike
 import (
 	"crypto/tls"
 	"fmt"
+	"sync"
 	"time"
 )
 
@@ -38,6 +39,8 @@ type Client struct {
 	events       map[ID]*eventDesc
 	procedures   map[ID]*procedureDesc
 	requestCount uint
+
+	lock sync.RWMutex
 }
 
 type procedureDesc struct {
@@ -212,11 +215,13 @@ func (c *Client) Receive() {
 		switch msg := msg.(type) {
 
 		case *Event:
+			c.lock.RLock()
 			if event, ok := c.events[msg.Subscription]; ok {
 				go event.handler(msg.Arguments, msg.ArgumentsKw)
 			} else {
 				log.Println("no handler registered for subscription:", msg.Subscription)
 			}
+			c.lock.RUnlock()
 
 		case *Invocation:
 			c.handleInvocation(msg)
@@ -251,6 +256,8 @@ func (c *Client) Receive() {
 
 func (c *Client) notifyListener(msg Message, requestID ID) {
 	// pass in the request ID so we don't have to do any type assertion
+	c.lock.RLock()
+	defer c.lock.RUnlock()
 	if l, ok := c.listeners[requestID]; ok {
 		l <- msg
 	} else {
@@ -259,7 +266,9 @@ func (c *Client) notifyListener(msg Message, requestID ID) {
 }
 
 func (c *Client) handleInvocation(msg *Invocation) {
+	c.lock.RLock()
 	if proc, ok := c.procedures[msg.Registration]; ok {
+		c.lock.RUnlock()
 		go func() {
 			result := proc.handler(msg.Arguments, msg.ArgumentsKw, msg.Details)
 
@@ -287,6 +296,7 @@ func (c *Client) handleInvocation(msg *Invocation) {
 			}
 		}()
 	} else {
+		c.lock.RUnlock()
 		log.Println("no handler registered for registration:", msg.Registration)
 		if err := c.Send(&Error{
 			Type:    INVOCATION,
@@ -302,12 +312,16 @@ func (c *Client) handleInvocation(msg *Invocation) {
 func (c *Client) registerListener(id ID) {
 	log.Println("register listener:", id)
 	wait := make(chan Message, 1)
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	c.listeners[id] = wait
 }
 
 func (c *Client) waitOnListener(id ID) (msg Message, err error) {
 	log.Println("wait on listener:", id)
+	c.lock.RLock()
 	wait, ok := c.listeners[id]
+	c.lock.RUnlock()
 	if !ok {
 		return nil, fmt.Errorf("unknown listener ID: %v", id)
 	}
@@ -345,6 +359,8 @@ func (c *Client) Subscribe(topic string, fn EventHandler) error {
 		return fmt.Errorf(formatUnexpectedMessage(msg, SUBSCRIBED))
 	} else {
 		// register the event handler with this subscription
+		c.lock.Lock()
+		defer c.lock.Unlock()
 		c.events[subscribed.Subscription] = &eventDesc{topic, fn}
 	}
 	return nil
@@ -356,12 +372,14 @@ func (c *Client) Unsubscribe(topic string) error {
 		subscriptionID ID
 		found          bool
 	)
+	c.lock.RLock()
 	for id, desc := range c.events {
 		if desc.topic == topic {
 			subscriptionID = id
 			found = true
 		}
 	}
+	c.lock.RUnlock()
 	if !found {
 		return fmt.Errorf("Event %s is not registered with this client.", topic)
 	}
@@ -384,6 +402,9 @@ func (c *Client) Unsubscribe(topic string) error {
 	} else if _, ok := msg.(*Unsubscribed); !ok {
 		return fmt.Errorf(formatUnexpectedMessage(msg, UNSUBSCRIBED))
 	}
+
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	delete(c.events, subscriptionID)
 	return nil
 }
@@ -416,6 +437,8 @@ func (c *Client) Register(procedure string, fn MethodHandler, options map[string
 		return fmt.Errorf(formatUnexpectedMessage(msg, REGISTERED))
 	} else {
 		// register the event handler with this registration
+		c.lock.Lock()
+		defer c.lock.Unlock()
 		c.procedures[registered.Registration] = &procedureDesc{procedure, fn}
 	}
 	return nil
@@ -439,12 +462,14 @@ func (c *Client) Unregister(procedure string) error {
 		procedureID ID
 		found       bool
 	)
+	c.lock.RLock()
 	for id, p := range c.procedures {
 		if p.name == procedure {
 			procedureID = id
 			found = true
 		}
 	}
+	c.lock.RUnlock()
 	if !found {
 		return fmt.Errorf("Procedure %s is not registered with this client.", procedure)
 	}
@@ -468,6 +493,8 @@ func (c *Client) Unregister(procedure string) error {
 		return fmt.Errorf(formatUnexpectedMessage(msg, UNREGISTERED))
 	}
 	// register the event handler with this unregistration
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	delete(c.procedures, procedureID)
 	return nil
 }

From c1cf8e07d75d9fcae432ac910af39e70cb708bda Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 4 Aug 2016 10:45:14 -0600
Subject: [PATCH 15/25] Added more locks for unlikely, but still possible data
 races

---
 dealer.go           | 4 +++-
 websocket_server.go | 9 +++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/dealer.go b/dealer.go
index 3758714..d6d6fea 100644
--- a/dealer.go
+++ b/dealer.go
@@ -210,8 +210,10 @@ func (d *defaultDealer) Error(sess *Session, msg *Error) {
 }
 
 func (d *defaultDealer) RemoveSession(sess *Session) {
+	d.Lock()
+	defer d.Unlock()
+
 	// TODO: this is low hanging fruit for optimization
-	// TODO: potential data race here, should lock the map
 	for _, rproc := range d.procedures {
 		if rproc.Endpoint == sess {
 			delete(d.registrations, rproc.Registration)
diff --git a/websocket_server.go b/websocket_server.go
index c04414c..93a6ae5 100644
--- a/websocket_server.go
+++ b/websocket_server.go
@@ -3,6 +3,7 @@ package turnpike
 import (
 	"fmt"
 	"net/http"
+	"sync"
 
 	"github.com/gorilla/websocket"
 )
@@ -40,6 +41,8 @@ type WebsocketServer struct {
 	TextSerializer Serializer
 	// The serializer to use for binary frames. Defaults to JSONSerializer.
 	BinarySerializer Serializer
+
+	lock sync.RWMutex
 }
 
 // NewWebsocketServer creates a new WebsocketServer from a map of realms
@@ -79,6 +82,8 @@ func (s *WebsocketServer) RegisterProtocol(proto string, payloadType int, serial
 	if payloadType != websocket.TextMessage && payloadType != websocket.BinaryMessage {
 		return invalidPayload(payloadType)
 	}
+	s.lock.Lock()
+	defer s.lock.Unlock()
 	if _, ok := s.protocols[proto]; ok {
 		return protocolExists(proto)
 	}
@@ -114,10 +119,14 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) {
 	var serializer Serializer
 	var payloadType int
+	s.lock.RLock()
 	if proto, ok := s.protocols[conn.Subprotocol()]; ok {
+		s.lock.RUnlock()
 		serializer = proto.serializer
 		payloadType = proto.payloadType
 	} else {
+		s.lock.RUnlock()
+
 		// TODO: this will not currently ever be hit because
 		//       gorilla/websocket will reject the conncetion
 		//       if the subprotocol isn't registered

From 71f77d3803ea2f587930fd6c10c80c0cc08813bf Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Fri, 19 Aug 2016 16:10:40 -0600
Subject: [PATCH 16/25] If the websocket is closed, don't close it again

- it seems like this was blocking when the socket was already closed,
  which caused some stability issues over time as goroutines built up
---
 websocket.go | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/websocket.go b/websocket.go
index 64a6f3f..06e44ef 100644
--- a/websocket.go
+++ b/websocket.go
@@ -86,8 +86,11 @@ func (ep *websocketPeer) run() {
 				log.Println("peer connection closed")
 			} else {
 				log.Println("error reading from peer:", err)
-				ep.conn.Close()
+				if !websocket.IsCloseError(err) {
+					ep.conn.Close()
+				}
 			}
+			log.Println("Closing channel")
 			close(ep.messages)
 			break
 		} else if msgType == websocket.CloseMessage {

From afcecc366d50a10094931daf0240fe674f049375 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Mon, 22 Aug 2016 16:14:52 -0600
Subject: [PATCH 17/25] Close session in realm

---
 realm.go | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/realm.go b/realm.go
index 3f48b2f..d5241d6 100644
--- a/realm.go
+++ b/realm.go
@@ -44,7 +44,10 @@ func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) {
 	if details == nil {
 		details = make(map[string]interface{})
 	}
-	go r.handleSession(sess)
+	go func() {
+		r.handleSession(sess)
+		sess.Close()
+	}()
 	log.Println("Established internal session:", sess)
 	return peerB, nil
 }

From 00ecc285603cad92a814384e743bbd53ccb65257 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 25 Aug 2016 09:53:02 -0600
Subject: [PATCH 18/25] Send messages in a goroutine

- I noticed a that defaultBroker.Subscribe could block, and the only way
  that could happen is if the Send function in either Unsubscribe
  (unlikely) or Publish (very likely) blocked
- I think there's still a larger problem with transports blocking on the
  Send, but this should at least allow messages to go through if that
  happens
---
 broker.go | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/broker.go b/broker.go
index 62e92f9..c62b1e9 100644
--- a/broker.go
+++ b/broker.go
@@ -71,13 +71,13 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) {
 		event.Subscription = id
 		// don't send event to publisher
 		if sub != pub || !excludePublisher {
-			sub.Send(&event)
+			go sub.Send(&event)
 		}
 	}
 
 	// only send published message if acknowledge is present and set to true
 	if doPub, _ := msg.Options["acknowledge"].(bool); doPub {
-		pub.Send(&Published{Request: msg.Request, Publication: pubID})
+		go pub.Send(&Published{Request: msg.Request, Publication: pubID})
 	}
 }
 
@@ -94,7 +94,7 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
 
 	br.subscriptions[id] = msg.Topic
 
-	sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id})
+	go sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id})
 }
 
 func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) {
@@ -124,5 +124,5 @@ func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) {
 			delete(br.routes, topic)
 		}
 	}
-	sess.Peer.Send(&Unsubscribed{Request: msg.Request})
+	go sess.Peer.Send(&Unsubscribed{Request: msg.Request})
 }

From 4a8e170138a4d1c2ed9aa4c978f30737b1bb5abb Mon Sep 17 00:00:00 2001
From: Aaron Shumway <aclshumway@gmail.com>
Date: Thu, 25 Aug 2016 16:34:37 -0600
Subject: [PATCH 19/25] Added sendMsgs channel to send websocket messages
 without blocking.

Also added a write timeout.
---
 websocket.go        | 170 ++++++++++++++++++++++++++++++++++++++++----
 websocket_server.go |  25 +++++--
 2 files changed, 177 insertions(+), 18 deletions(-)

diff --git a/websocket.go b/websocket.go
index 06e44ef..0a31a34 100644
--- a/websocket.go
+++ b/websocket.go
@@ -2,6 +2,7 @@ package turnpike
 
 import (
 	"crypto/tls"
+	"errors"
 	"fmt"
 	"sync"
 	"time"
@@ -9,13 +10,22 @@ import (
 	"github.com/gorilla/websocket"
 )
 
+// errors.
+var (
+	ErrWSSendTimeout = errors.New("ws peer send timeout")
+	ErrWSIsClosed    = errors.New("ws peer is closed")
+)
+
 type websocketPeer struct {
 	conn        *websocket.Conn
 	serializer  Serializer
+	sendMsgs    chan Message
 	messages    chan Message
 	payloadType int
-	closed      bool
 	mutex       sync.Mutex
+	inSending   chan struct{}
+	closing     chan struct{}
+	*ConnectionConfig
 }
 
 // NewWebsocketPeer connects to the websocket server at the specified url.
@@ -44,45 +54,113 @@ func newWebsocketPeer(url, protocol, origin string, serializer Serializer, paylo
 		return nil, err
 	}
 	ep := &websocketPeer{
-		conn:        conn,
-		messages:    make(chan Message, 100),
-		serializer:  serializer,
-		payloadType: payloadType,
+		conn:             conn,
+		sendMsgs:         make(chan Message, 16),
+		messages:         make(chan Message, 100),
+		serializer:       serializer,
+		payloadType:      payloadType,
+		closing:          make(chan struct{}),
+		ConnectionConfig: &ConnectionConfig{},
 	}
 	go ep.run()
 
 	return ep, nil
 }
 
-// TODO: make this just add the message to a channel so we don't block
 func (ep *websocketPeer) Send(msg Message) error {
-	b, err := ep.serializer.Serialize(msg)
-	if err != nil {
-		return err
+	select {
+	case ep.sendMsgs <- msg:
+		return nil
+	case <-time.After(5 * time.Second):
+		log.Println(ErrWSSendTimeout.Error())
+		ep.Close()
+		return ErrWSSendTimeout
+	case <-ep.closing:
+		log.Println(ErrWSIsClosed.Error())
+		return ErrWSIsClosed
 	}
-	ep.mutex.Lock()
-	defer ep.mutex.Unlock()
-	return ep.conn.WriteMessage(ep.payloadType, b)
 }
+
 func (ep *websocketPeer) Receive() <-chan Message {
 	return ep.messages
 }
+
+func (ep *websocketPeer) doClosing() {
+	ep.mutex.Lock()
+	defer ep.mutex.Unlock()
+
+	select {
+	case <-ep.closing:
+	default:
+		close(ep.closing)
+	}
+}
+
+func (ep *websocketPeer) isClosed() bool {
+	select {
+	case <-ep.closing:
+		return true
+	default:
+		return false
+	}
+}
+
 func (ep *websocketPeer) Close() error {
+	if ep.isClosed() {
+		return nil
+	}
+	ep.doClosing()
+
+	if ep.inSending != nil {
+		<-ep.inSending
+	}
+
 	closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "goodbye")
 	err := ep.conn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(5*time.Second))
 	if err != nil {
 		log.Println("error sending close message:", err)
 	}
-	ep.closed = true
+
 	return ep.conn.Close()
 }
 
+func (ep *websocketPeer) updateReadDeadline() {
+	ep.mutex.Lock()
+	defer ep.mutex.Unlock()
+	if ep.IdleTimeout > 0 {
+		ep.conn.SetReadDeadline(time.Now().Add(ep.IdleTimeout))
+	}
+}
+
+func (ep *websocketPeer) setReadDead() {
+	ep.mutex.Lock()
+	defer ep.mutex.Unlock()
+	ep.conn.SetReadDeadline(time.Now())
+}
+
 func (ep *websocketPeer) run() {
+	go ep.sending()
+
+	if ep.MaxMsgSize > 0 {
+		ep.conn.SetReadLimit(ep.MaxMsgSize)
+	}
+	ep.conn.SetPongHandler(func(v string) error {
+		log.Println("pong:", v)
+		ep.updateReadDeadline()
+		return nil
+	})
+	ep.conn.SetPingHandler(func(v string) error {
+		log.Println("ping:", v)
+		ep.updateReadDeadline()
+		return nil
+	})
+
 	for {
 		// TODO: use conn.NextMessage() and stream
 		// TODO: do something different based on binary/text frames
+		ep.updateReadDeadline()
 		if msgType, b, err := ep.conn.ReadMessage(); err != nil {
-			if ep.closed {
+			if ep.isClosed() {
 				log.Println("peer connection closed")
 			} else {
 				log.Println("error reading from peer:", err)
@@ -108,3 +186,67 @@ func (ep *websocketPeer) run() {
 		}
 	}
 }
+
+func (ep *websocketPeer) sending() {
+	ep.inSending = make(chan struct{})
+	var ticker *time.Ticker
+	if ep.PingTimeout == 0 {
+		ticker = time.NewTicker(7 * 24 * time.Hour)
+	} else {
+		ticker = time.NewTicker(ep.PingTimeout)
+	}
+
+	defer func() {
+		ep.setReadDead()
+		ticker.Stop()
+		close(ep.inSending)
+	}()
+
+	for {
+		select {
+		case msg := <-ep.sendMsgs:
+			if closed, _ := ep.doSend(msg); closed {
+				return
+			}
+		case <-ticker.C:
+			wt := ep.WriteTimeout
+			if wt == 0 {
+				wt = 10 * time.Second
+			}
+			if err := ep.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(wt)); err != nil {
+				log.Println("error sending ping message:", err)
+				return
+			}
+		case <-ep.closing:
+			// sending remaining messages.
+			for {
+				select {
+				case msg := <-ep.sendMsgs:
+					if closed, _ := ep.doSend(msg); !closed {
+						continue
+					}
+				default:
+				}
+				break
+			}
+			return
+		}
+		ep.updateReadDeadline()
+	}
+}
+
+func (ep *websocketPeer) doSend(msg Message) (closed bool, err error) {
+	b, err := ep.serializer.Serialize(msg)
+	if err != nil {
+		log.Printf("error serializing peer message: %s, %+v", err, msg)
+		return true, err
+	}
+	if ep.WriteTimeout > 0 {
+		ep.conn.SetWriteDeadline(time.Now().Add(ep.WriteTimeout))
+	}
+	if err = ep.conn.WriteMessage(ep.payloadType, b); err != nil {
+		log.Println("error write message: ", err)
+		return true, err
+	}
+	return
+}
diff --git a/websocket_server.go b/websocket_server.go
index 93a6ae5..246b7cc 100644
--- a/websocket_server.go
+++ b/websocket_server.go
@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"net/http"
 	"sync"
+	"time"
 
 	"github.com/gorilla/websocket"
 )
@@ -30,6 +31,14 @@ type protocol struct {
 	serializer  Serializer
 }
 
+// ConnectionConfig is the configs of a connection.
+type ConnectionConfig struct {
+	MaxMsgSize   int64
+	WriteTimeout time.Duration
+	PingTimeout  time.Duration
+	IdleTimeout  time.Duration
+}
+
 // WebsocketServer handles websocket connections.
 type WebsocketServer struct {
 	Router
@@ -41,6 +50,7 @@ type WebsocketServer struct {
 	TextSerializer Serializer
 	// The serializer to use for binary frames. Defaults to JSONSerializer.
 	BinarySerializer Serializer
+	ConnectionConfig
 
 	lock sync.RWMutex
 }
@@ -69,6 +79,10 @@ func newWebsocketServer(r Router) *WebsocketServer {
 	s := &WebsocketServer{
 		Router:    r,
 		protocols: make(map[string]protocol),
+		ConnectionConfig: ConnectionConfig{
+			// PingTimeout: 3 * time.Minute,
+			WriteTimeout: 10 * time.Second,
+		},
 	}
 	s.Upgrader = &websocket.Upgrader{}
 	s.RegisterProtocol(jsonWebsocketProtocol, websocket.TextMessage, new(JSONSerializer))
@@ -144,10 +158,13 @@ func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) {
 	}
 
 	peer := websocketPeer{
-		conn:        conn,
-		serializer:  serializer,
-		messages:    make(chan Message, 100),
-		payloadType: payloadType,
+		conn:             conn,
+		serializer:       serializer,
+		sendMsgs:         make(chan Message, 16),
+		messages:         make(chan Message, 100),
+		payloadType:      payloadType,
+		closing:          make(chan struct{}),
+		ConnectionConfig: &s.ConnectionConfig,
 	}
 	go peer.run()
 

From 90945ea40fd9820e28d60949e20ab74bda70d157 Mon Sep 17 00:00:00 2001
From: Aaron Shumway <aclshumway@gmail.com>
Date: Thu, 25 Aug 2016 16:51:41 -0600
Subject: [PATCH 20/25] remove session subscriptions when session ends

---
 broker.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++--------
 realm.go  |  1 +
 2 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/broker.go b/broker.go
index c62b1e9..1bc60c6 100644
--- a/broker.go
+++ b/broker.go
@@ -13,12 +13,15 @@ type Broker interface {
 	Subscribe(*Session, *Subscribe)
 	// Unsubscribes from messages on a URI.
 	Unsubscribe(*Session, *Unsubscribe)
+	// Remove all of session's subscriptions.
+	RemoveSession(*Session)
 }
 
 // A super simple broker that matches URIs to Subscribers.
 type defaultBroker struct {
 	routes        map[URI]map[ID]Sender
 	subscriptions map[ID]URI
+	subscribers   map[*Session][]ID
 
 	lastRequestId ID
 
@@ -31,6 +34,7 @@ func NewDefaultBroker() Broker {
 	return &defaultBroker{
 		routes:        make(map[URI]map[ID]Sender),
 		subscriptions: make(map[ID]URI),
+		subscribers:   make(map[*Session][]ID),
 	}
 }
 
@@ -91,38 +95,78 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) {
 	}
 	id := br.nextRequestId()
 	br.routes[msg.Topic][id] = sess.Peer
-
 	br.subscriptions[id] = msg.Topic
 
+	// subscribers
+	ids, ok := br.subscribers[sess]
+	if !ok {
+		ids = []ID{}
+	}
+	ids = append(ids, id)
+	br.subscribers[sess] = ids
+
 	go sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id})
 }
 
+func (br *defaultBroker) RemoveSession(sess *Session) {
+	log.Printf("broker remove peer %p", sess)
+	br.Lock()
+	defer br.Unlock()
+
+	for _, id := range br.subscribers[sess] {
+		br.unsubscribe(sess, id)
+	}
+}
+
 func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) {
 	br.Lock()
 	defer br.Unlock()
 
-	topic, ok := br.subscriptions[msg.Subscription]
-	if !ok {
+	if !br.unsubscribe(sess, msg.Subscription) {
 		err := &Error{
 			Type:    msg.MessageType(),
 			Request: msg.Request,
 			Error:   ErrNoSuchSubscription,
 		}
-		sess.Peer.Send(err)
+		go sess.Peer.Send(err)
 		log.Printf("Error unsubscribing: no such subscription %v", msg.Subscription)
 		return
 	}
-	delete(br.subscriptions, msg.Subscription)
+
+	go sess.Peer.Send(&Unsubscribed{Request: msg.Request})
+}
+
+func (br *defaultBroker) unsubscribe(sess *Session, id ID) bool {
+	log.Printf("broker unsubscribing: %p, %d", &sess, id)
+	topic, ok := br.subscriptions[id]
+	if !ok {
+		return false
+	}
+	delete(br.subscriptions, id)
 
 	if r, ok := br.routes[topic]; !ok {
 		log.Printf("Error unsubscribing: unable to find routes for %s topic", topic)
-	} else if _, ok := r[msg.Subscription]; !ok {
-		log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, msg.Subscription)
+	} else if _, ok := r[id]; !ok {
+		log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, id)
 	} else {
-		delete(r, msg.Subscription)
+		delete(r, id)
 		if len(r) == 0 {
 			delete(br.routes, topic)
 		}
 	}
-	go sess.Peer.Send(&Unsubscribed{Request: msg.Request})
+
+	// subscribers
+	ids := br.subscribers[sess][:0]
+	for _, id := range br.subscribers[sess] {
+		if id != id {
+			ids = append(ids, id)
+		}
+	}
+	if len(ids) == 0 {
+		delete(br.subscribers, sess)
+	} else {
+		br.subscribers[sess] = ids
+	}
+
+	return true
 }
diff --git a/realm.go b/realm.go
index d5241d6..f6454ff 100644
--- a/realm.go
+++ b/realm.go
@@ -194,6 +194,7 @@ func (r *Realm) handleSession(sess *Session) {
 		defer r.lock.RUnlock()
 
 		r.clients.Remove(string(sess.Id))
+		r.Broker.RemoveSession(sess)
 		r.Dealer.RemoveSession(sess)
 		r.localClient.onLeave(sess.Id)
 	}()

From 20397bde7616ddf19a865519713aca7ae14c4b61 Mon Sep 17 00:00:00 2001
From: Jameson Little <t.jameson.little@gmail.com>
Date: Mon, 31 Oct 2016 10:53:37 -0600
Subject: [PATCH 21/25] Clean up client listeners (#2)

- fixes memory leak
---
 client.go | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/client.go b/client.go
index 3052184..492176a 100644
--- a/client.go
+++ b/client.go
@@ -317,6 +317,11 @@ func (c *Client) registerListener(id ID) {
 	c.listeners[id] = wait
 }
 
+func (c *Client) unregisterListener(id ID) {
+	log.Println("unregister listener:", id)
+	delete(c.listeners, id)
+}
+
 func (c *Client) waitOnListener(id ID) (msg Message, err error) {
 	log.Println("wait on listener:", id)
 	c.lock.RLock()
@@ -341,6 +346,9 @@ type EventHandler func(args []interface{}, kwargs map[string]interface{})
 func (c *Client) Subscribe(topic string, fn EventHandler) error {
 	id := NewID()
 	c.registerListener(id)
+	// TODO: figure out where to clean this up
+	// defer c.unregisterListener(id)
+
 	sub := &Subscribe{
 		Request: id,
 		Options: make(map[string]interface{}),
@@ -386,6 +394,8 @@ func (c *Client) Unsubscribe(topic string) error {
 
 	id := NewID()
 	c.registerListener(id)
+	defer c.unregisterListener(id)
+
 	sub := &Unsubscribe{
 		Request:      id,
 		Subscription: subscriptionID,
@@ -418,6 +428,9 @@ type MethodHandler func(
 func (c *Client) Register(procedure string, fn MethodHandler, options map[string]interface{}) error {
 	id := NewID()
 	c.registerListener(id)
+	// TODO: figure out where to clean this up
+	// defer c.unregisterListener(id)
+
 	register := &Register{
 		Request:   id,
 		Options:   options,
@@ -475,6 +488,8 @@ func (c *Client) Unregister(procedure string) error {
 	}
 	id := NewID()
 	c.registerListener(id)
+	defer c.unregisterListener(id)
+
 	unregister := &Unregister{
 		Request:      id,
 		Registration: procedureID,
@@ -523,6 +538,7 @@ func (rpc RPCError) Error() string {
 func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]interface{}) (*Result, error) {
 	id := NewID()
 	c.registerListener(id)
+	defer c.unregisterListener(id)
 
 	call := &Call{
 		Request:     id,

From 4a4b73895f34dbafb57072896d3f75542dbc684b Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Mon, 31 Oct 2016 10:55:13 -0600
Subject: [PATCH 22/25] Fix data race panic

---
 client.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/client.go b/client.go
index 492176a..2cfbd75 100644
--- a/client.go
+++ b/client.go
@@ -318,6 +318,9 @@ func (c *Client) registerListener(id ID) {
 }
 
 func (c *Client) unregisterListener(id ID) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
 	log.Println("unregister listener:", id)
 	delete(c.listeners, id)
 }

From a8e7cbce2ba84230eab0579be1874bdff8cd1235 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Mon, 16 Jan 2017 16:00:55 -0700
Subject: [PATCH 23/25] Recover from send on closed channel

---
 session.go | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/session.go b/session.go
index 0912bce..38ffc7d 100644
--- a/session.go
+++ b/session.go
@@ -59,12 +59,21 @@ func (s *localPeer) Receive() <-chan Message {
 	return s.incoming
 }
 
-func (s *localPeer) Send(msg Message) error {
+func (s *localPeer) Send(msg Message) (err error) {
+	defer func() {
+		// just in case Close is called before Send
+		if r := recover(); r != nil {
+			err = fmt.Errorf("Attempt to write after Close()")
+		}
+	}()
 	s.outgoing <- msg
-	return nil
+	return
 }
 
 func (s *localPeer) Close() error {
-	close(s.outgoing)
+	if s.outgoing != nil {
+		close(s.outgoing)
+		s.outgoing = nil
+	}
 	return nil
 }

From aaa21ce8fe51ce110e43204dc642e765e34e7611 Mon Sep 17 00:00:00 2001
From: "T. Jameson Little" <t.jameson.little@gmail.com>
Date: Thu, 8 Jun 2017 11:07:25 -0600
Subject: [PATCH 24/25] Increase receive timeout to 30 seconds

This should probably be easier to set.
---
 client.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/client.go b/client.go
index 2cfbd75..2fdf9f8 100644
--- a/client.go
+++ b/client.go
@@ -67,7 +67,7 @@ func NewWebsocketClient(serialization Serialization, url string, tlscfg *tls.Con
 func NewClient(p Peer) *Client {
 	c := &Client{
 		Peer:           p,
-		ReceiveTimeout: 10 * time.Second,
+		ReceiveTimeout: 30 * time.Second,
 		listeners:      make(map[ID]chan Message),
 		events:         make(map[ID]*eventDesc),
 		procedures:     make(map[ID]*procedureDesc),

From 573f579df7ee6b7f2962625d35a624d11222f2ec Mon Sep 17 00:00:00 2001
From: Aaron Shumway <aclshumway@gmail.com>
Date: Wed, 6 Sep 2017 09:21:25 -0600
Subject: [PATCH 25/25] Manually close connection on unexpected errors

---
 websocket.go | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/websocket.go b/websocket.go
index 0a31a34..967949a 100644
--- a/websocket.go
+++ b/websocket.go
@@ -164,7 +164,8 @@ func (ep *websocketPeer) run() {
 				log.Println("peer connection closed")
 			} else {
 				log.Println("error reading from peer:", err)
-				if !websocket.IsCloseError(err) {
+				// only expected errors seem to close the connection, so close on any unexpected errors
+				if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
 					ep.conn.Close()
 				}
 			}