From 239195fd160b7d61f688bb77590fc33b7e5a112a Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Wed, 2 Dec 2015 15:29:03 -0700 Subject: [PATCH 01/25] All functions that took a Peer now take a *Session - allows us to generate session-specific ids --- authorizer.go | 4 +- broker.go | 25 +++--- broker_test.go | 24 ++++-- dealer.go | 225 ++++++++++++++++++++++-------------------------- dealer_test.go | 57 ++++++------ interceptor.go | 4 +- realm.go | 26 +++--- router.go | 2 +- session.go | 16 +++- session_test.go | 21 +++++ 10 files changed, 215 insertions(+), 189 deletions(-) create mode 100644 session_test.go diff --git a/authorizer.go b/authorizer.go index 8a3f39e..a814498 100644 --- a/authorizer.go +++ b/authorizer.go @@ -6,7 +6,7 @@ package turnpike // Authorize takes the session and the message (request), and returns true if // the request is authorized, otherwise false. type Authorizer interface { - Authorize(session Session, msg Message) (bool, error) + Authorize(session *Session, msg Message) (bool, error) } // DefaultAuthorizer always returns authorized. @@ -18,6 +18,6 @@ func NewDefaultAuthorizer() Authorizer { return &defaultAuthorizer{} } -func (da *defaultAuthorizer) Authorize(session Session, msg Message) (bool, error) { +func (da *defaultAuthorizer) Authorize(session *Session, msg Message) (bool, error) { return true, nil } diff --git a/broker.go b/broker.go index b7b7596..3c864d3 100644 --- a/broker.go +++ b/broker.go @@ -4,11 +4,11 @@ package turnpike // from Publishers to Subscribers. type Broker interface { // Publishes a message to all Subscribers. - Publish(Sender, *Publish) + Publish(*Session, *Publish) // Subscribes to messages on a URI. - Subscribe(Sender, *Subscribe) + Subscribe(*Session, *Subscribe) // Unsubscribes from messages on a URI. - Unsubscribe(Sender, *Unsubscribe) + Unsubscribe(*Session, *Unsubscribe) } // A super simple broker that matches URIs to Subscribers. @@ -30,8 +30,9 @@ func NewDefaultBroker() Broker { // // If msg.Options["acknowledge"] == true, the publisher receives a Published event // after the message has been sent to all subscribers. -func (br *defaultBroker) Publish(pub Sender, msg *Publish) { - pubID := NewID() +func (br *defaultBroker) Publish(sess *Session, msg *Publish) { + pub := sess.Peer + pubID := sess.NextRequestId() evtTemplate := Event{ Publication: pubID, Arguments: msg.Arguments, @@ -55,19 +56,19 @@ func (br *defaultBroker) Publish(pub Sender, msg *Publish) { } // Subscribe subscribes the client to the given topic. -func (br *defaultBroker) Subscribe(sub Sender, msg *Subscribe) { +func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) { if _, ok := br.routes[msg.Topic]; !ok { br.routes[msg.Topic] = make(map[ID]Sender) } - id := NewID() - br.routes[msg.Topic][id] = sub + id := sess.NextRequestId() + br.routes[msg.Topic][id] = sess.Peer br.subscriptions[id] = msg.Topic - sub.Send(&Subscribed{Request: msg.Request, Subscription: id}) + sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id}) } -func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) { +func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) { topic, ok := br.subscriptions[msg.Subscription] if !ok { err := &Error{ @@ -75,7 +76,7 @@ func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) { Request: msg.Request, Error: ErrNoSuchSubscription, } - sub.Send(err) + sess.Peer.Send(err) log.Printf("Error unsubscribing: no such subscription %v", msg.Subscription) return } @@ -91,5 +92,5 @@ func (br *defaultBroker) Unsubscribe(sub Sender, msg *Unsubscribe) { delete(br.routes, topic) } } - sub.Send(&Unsubscribed{Request: msg.Request}) + sess.Peer.Send(&Unsubscribed{Request: msg.Request}) } diff --git a/broker_test.go b/broker_test.go index 5c8480c..d53807f 100644 --- a/broker_test.go +++ b/broker_test.go @@ -6,19 +6,28 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -type TestSender struct { +type TestPeer struct { received Message + sent Message } -func (s *TestSender) Send(msg Message) error { s.received = msg; return nil } +func (s *TestPeer) Send(msg Message) error { + s.received = msg + return nil +} + +// TODO: implement me +func (s *TestPeer) Receive() <-chan Message { return nil } +func (s *TestPeer) Close() error { return nil } func TestSubscribe(t *testing.T) { Convey("Subscribing to a topic", t, func() { broker := NewDefaultBroker().(*defaultBroker) - subscriber := &TestSender{} + subscriber := &TestPeer{} + sess := &Session{Peer: subscriber} testTopic := URI("turnpike.test.topic") msg := &Subscribe{Request: 123, Topic: testTopic} - broker.Subscribe(subscriber, msg) + broker.Subscribe(sess, msg) Convey("The subscriber should have received a SUBSCRIBED message", func() { sub := subscriber.received.(*Subscribed).Subscription @@ -38,15 +47,16 @@ func TestSubscribe(t *testing.T) { func TestUnsubscribe(t *testing.T) { broker := NewDefaultBroker().(*defaultBroker) - subscriber := &TestSender{} + subscriber := &TestPeer{} testTopic := URI("turnpike.test.topic") msg := &Subscribe{Request: 123, Topic: testTopic} - broker.Subscribe(subscriber, msg) + sess := &Session{Peer: subscriber} + broker.Subscribe(sess, msg) sub := subscriber.received.(*Subscribed).Subscription Convey("Unsubscribing from a topic", t, func() { msg := &Unsubscribe{Request: 124, Subscription: sub} - broker.Unsubscribe(subscriber, msg) + broker.Unsubscribe(sess, msg) Convey("The peer should have received an UNSUBSCRIBED message", func() { unsub := subscriber.received.(*Unsubscribed).Request diff --git a/dealer.go b/dealer.go index e9fcd1e..d60c391 100644 --- a/dealer.go +++ b/dealer.go @@ -3,54 +3,56 @@ package turnpike // A Dealer routes and manages RPC calls to callees. type Dealer interface { // Register a procedure on an endpoint - Register(Sender, *Register) + Register(*Session, *Register) // Unregister a procedure on an endpoint - Unregister(Sender, *Unregister) + Unregister(*Session, *Unregister) // Call a procedure on an endpoint - Call(Sender, *Call) + Call(*Session, *Call) // Return the result of a procedure call - Yield(Sender, *Yield) + Yield(*Session, *Yield) // Handle an ERROR message from an invocation - Error(Sender, *Error) + Error(*Session, *Error) // Remove a callee's registrations - RemovePeer(Sender) + RemoveSession(*Session) } type remoteProcedure struct { - Endpoint Sender - Procedure URI + Endpoint *Session + Procedure URI + Registration ID +} + +type rpcRequest struct { + caller *Session + requestId ID } type defaultDealer struct { // map registration IDs to procedures - procedures map[ID]remoteProcedure + procedures map[URI]remoteProcedure // map procedure URIs to registration IDs // TODO: this will eventually need to be `map[URI][]ID` to support // multiple callees for the same procedure - registrations map[URI]ID - // keep track of call IDs so we can send the response to the caller - calls map[ID]Sender + registrations map[ID]URI + // link the invocation ID to the call ID - invocations map[ID]ID - // keep track of callee's registrations - callees map[Sender]map[ID]bool + invocations map[*Session]map[ID]rpcRequest + // callees map[*Session]map[ID]bool } // NewDefaultDealer returns the default turnpike dealer implementation func NewDefaultDealer() Dealer { return &defaultDealer{ - procedures: make(map[ID]remoteProcedure), - registrations: make(map[URI]ID), - calls: make(map[ID]Sender), - invocations: make(map[ID]ID), - callees: make(map[Sender]map[ID]bool), + procedures: make(map[URI]remoteProcedure), + registrations: make(map[ID]URI), + invocations: make(map[*Session]map[ID]rpcRequest), } } -func (d *defaultDealer) Register(callee Sender, msg *Register) { - if id, ok := d.registrations[msg.Procedure]; ok { +func (d *defaultDealer) Register(sess *Session, msg *Register) { + if id, ok := d.procedures[msg.Procedure]; ok { log.Println("error: procedure already exists:", msg.Procedure, id) - callee.Send(&Error{ + sess.Peer.Send(&Error{ Type: msg.MessageType(), Request: msg.Request, Details: make(map[string]interface{}), @@ -58,145 +60,120 @@ func (d *defaultDealer) Register(callee Sender, msg *Register) { }) return } - reg := NewID() - d.procedures[reg] = remoteProcedure{callee, msg.Procedure} - d.registrations[msg.Procedure] = reg - d.addCalleeRegistration(callee, reg) - log.Printf("registered procedure %v [%v]", reg, msg.Procedure) - callee.Send(&Registered{ + + registrationId := NewID() + d.procedures[msg.Procedure] = remoteProcedure{sess, msg.Procedure, registrationId} + d.registrations[registrationId] = msg.Procedure + + // d.addCalleeRegistration(sess, reg) + log.Printf("registered procedure %v [%v]", registrationId, msg.Procedure) + sess.Peer.Send(&Registered{ Request: msg.Request, - Registration: reg, + Registration: registrationId, }) } -func (d *defaultDealer) Unregister(callee Sender, msg *Unregister) { - if procedure, ok := d.procedures[msg.Registration]; !ok { +func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) { + if procedure, ok := d.registrations[msg.Registration]; !ok { // the registration doesn't exist log.Println("error: no such registration:", msg.Registration) - callee.Send(&Error{ + sess.Peer.Send(&Error{ Type: msg.MessageType(), Request: msg.Request, Details: make(map[string]interface{}), Error: ErrNoSuchRegistration, }) } else { - delete(d.registrations, procedure.Procedure) - delete(d.procedures, msg.Registration) - d.removeCalleeRegistration(callee, msg.Registration) - log.Printf("unregistered procedure %v [%v]", procedure.Procedure, msg.Registration) - callee.Send(&Unregistered{ + delete(d.registrations, msg.Registration) + delete(d.procedures, procedure) + // d.removeCalleeRegistration(sess, msg.Registration) + log.Printf("unregistered procedure %v [%v]", procedure, msg.Registration) + sess.Peer.Send(&Unregistered{ Request: msg.Request, }) } } -func (d *defaultDealer) Call(caller Sender, msg *Call) { - if reg, ok := d.registrations[msg.Procedure]; !ok { - caller.Send(&Error{ +func (d *defaultDealer) Call(sess *Session, msg *Call) { + if rproc, ok := d.procedures[msg.Procedure]; !ok { + sess.Peer.Send(&Error{ Type: msg.MessageType(), Request: msg.Request, Details: make(map[string]interface{}), Error: ErrNoSuchProcedure, }) } else { - if rproc, ok := d.procedures[reg]; !ok { - // found a registration id, but doesn't match any remote procedure - caller.Send(&Error{ - Type: msg.MessageType(), - Request: msg.Request, - Details: make(map[string]interface{}), - // TODO: what should this error be? - Error: URI("wamp.error.internal_error"), - }) - } else { - // everything checks out, make the invocation request - // TODO: make the Request ID specific to the caller - d.calls[msg.Request] = caller - invocationID := NewID() - d.invocations[invocationID] = msg.Request - rproc.Endpoint.Send(&Invocation{ - Request: invocationID, - Registration: reg, - Details: map[string]interface{}{}, - Arguments: msg.Arguments, - ArgumentsKw: msg.ArgumentsKw, - }) - log.Printf("dispatched CALL %v [%v] to callee as INVOCATION %v", - msg.Request, msg.Procedure, invocationID, - ) + // everything checks out, make the invocation request + // TODO: make the Request ID specific to the caller + // d.calls[msg.Request] = sess + invocationID := rproc.Endpoint.NextRequestId() + if d.invocations[rproc.Endpoint] == nil { + d.invocations[rproc.Endpoint] = make(map[ID]rpcRequest) } + d.invocations[rproc.Endpoint][invocationID] = rpcRequest{sess, msg.Request} + rproc.Endpoint.Send(&Invocation{ + Request: invocationID, + Registration: rproc.Registration, + Details: map[string]interface{}{}, + Arguments: msg.Arguments, + ArgumentsKw: msg.ArgumentsKw, + }) + log.Printf("dispatched CALL %v [%v] to callee as INVOCATION %v", + msg.Request, msg.Procedure, invocationID, + ) } } -func (d *defaultDealer) Yield(callee Sender, msg *Yield) { - if callID, ok := d.invocations[msg.Request]; !ok { +func (d *defaultDealer) Yield(sess *Session, msg *Yield) { + if d.invocations[sess] == nil { + log.Println("received YIELD message from unknown session:", sess.Id) + return + } + if call, ok := d.invocations[sess][msg.Request]; !ok { // WAMP spec doesn't allow sending an error in response to a YIELD message log.Println("received YIELD message with invalid invocation request ID:", msg.Request) } else { - delete(d.invocations, msg.Request) - if caller, ok := d.calls[callID]; !ok { - // found the invocation id, but doesn't match any call id - // WAMP spec doesn't allow sending an error in response to a YIELD message - log.Printf("received YIELD message, but unable to match it (%v) to a CALL ID", msg.Request) - } else { - delete(d.calls, callID) - // return the result to the caller - caller.Send(&Result{ - Request: callID, - Details: map[string]interface{}{}, - Arguments: msg.Arguments, - ArgumentsKw: msg.ArgumentsKw, - }) - log.Printf("returned YIELD %v to caller as RESULT %v", msg.Request, callID) - } + // TODO: delete old keys; could do here, but should probably lock the map + delete(d.invocations[sess], msg.Request) + // return the result to the caller + call.caller.Send(&Result{ + Request: call.requestId, + Details: map[string]interface{}{}, + Arguments: msg.Arguments, + ArgumentsKw: msg.ArgumentsKw, + }) + log.Printf("returned YIELD %v to caller as RESULT %v", msg.Request, call.requestId) } } -func (d *defaultDealer) Error(peer Sender, msg *Error) { - if callID, ok := d.invocations[msg.Request]; !ok { +func (d *defaultDealer) Error(sess *Session, msg *Error) { + if d.invocations[sess] == nil { + log.Println("received YIELD message from unknown session:", sess.Id) + return + } + if call, ok := d.invocations[sess][msg.Request]; !ok { log.Println("received ERROR (INVOCATION) message with invalid invocation request ID:", msg.Request) } else { - delete(d.invocations, msg.Request) - if caller, ok := d.calls[callID]; !ok { - log.Printf("received ERROR (INVOCATION) message, but unable to match it (%v) to a CALL ID", msg.Request) - } else { - delete(d.calls, callID) - // return an error to the caller - caller.Send(&Error{ - Type: CALL, - Request: callID, - Details: make(map[string]interface{}), - Arguments: msg.Arguments, - ArgumentsKw: msg.ArgumentsKw, - }) - log.Printf("returned ERROR %v to caller as ERROR %v", msg.Request, callID) - } + delete(d.invocations[sess], msg.Request) + // return an error to the caller + call.caller.Peer.Send(&Error{ + Type: CALL, + Request: call.requestId, + Details: make(map[string]interface{}), + Arguments: msg.Arguments, + ArgumentsKw: msg.ArgumentsKw, + }) + log.Printf("returned ERROR %v to caller as ERROR %v", msg.Request, call.requestId) } } -func (d *defaultDealer) RemovePeer(callee Sender) { - for reg := range d.callees[callee] { - if procedure, ok := d.procedures[reg]; ok { - delete(d.registrations, procedure.Procedure) - delete(d.procedures, reg) +func (d *defaultDealer) RemoveSession(sess *Session) { + // TODO: this is low hanging fruit for optimization + // TODO: potential data race here, should lock the map + for _, rproc := range d.procedures { + if rproc.Endpoint == sess { + delete(d.registrations, rproc.Registration) + delete(d.procedures, rproc.Procedure) } - d.removeCalleeRegistration(callee, reg) - } -} - -func (d *defaultDealer) addCalleeRegistration(callee Sender, reg ID) { - if _, ok := d.callees[callee]; !ok { - d.callees[callee] = make(map[ID]bool) - } - d.callees[callee][reg] = true -} - -func (d *defaultDealer) removeCalleeRegistration(callee Sender, reg ID) { - if _, ok := d.callees[callee]; !ok { - return - } - delete(d.callees[callee], reg) - if len(d.callees[callee]) == 0 { - delete(d.callees, callee) } } diff --git a/dealer_test.go b/dealer_test.go index e5ccbad..b862459 100644 --- a/dealer_test.go +++ b/dealer_test.go @@ -9,10 +9,11 @@ import ( func TestRegister(t *testing.T) { Convey("Registering a procedure", t, func() { dealer := NewDefaultDealer().(*defaultDealer) - callee := &TestSender{} + callee := &TestPeer{} testProcedure := URI("turnpike.test.endpoint") msg := &Register{Request: 123, Procedure: testProcedure} - dealer.Register(callee, msg) + sess := &Session{Peer: callee} + dealer.Register(sess, msg) Convey("The callee should have received a REGISTERED message", func() { reg := callee.received.(*Registered).Registration @@ -21,17 +22,17 @@ func TestRegister(t *testing.T) { Convey("The dealer should have the endpoint registered", func() { reg := callee.received.(*Registered).Registration - reg2, ok := dealer.registrations[testProcedure] + proc, ok := dealer.procedures[testProcedure] So(ok, ShouldBeTrue) - So(reg, ShouldEqual, reg2) - proc, ok := dealer.procedures[reg] + So(reg, ShouldEqual, proc.Registration) + procedure, ok := dealer.registrations[reg] So(ok, ShouldBeTrue) - So(proc.Procedure, ShouldEqual, testProcedure) + So(procedure, ShouldEqual, testProcedure) }) Convey("The same procedure cannot be registered more than once", func() { msg := &Register{Request: 321, Procedure: testProcedure} - dealer.Register(callee, msg) + dealer.Register(sess, msg) err := callee.received.(*Error) So(err.Error, ShouldEqual, ErrProcedureAlreadyExists) So(err.Details, ShouldNotBeNil) @@ -41,15 +42,16 @@ func TestRegister(t *testing.T) { func TestUnregister(t *testing.T) { dealer := NewDefaultDealer().(*defaultDealer) - callee := &TestSender{} + callee := &TestPeer{} testProcedure := URI("turnpike.test.endpoint") msg := &Register{Request: 123, Procedure: testProcedure} - dealer.Register(callee, msg) + sess := &Session{Peer: callee} + dealer.Register(sess, msg) reg := callee.received.(*Registered).Registration Convey("Unregistering a procedure", t, func() { msg := &Unregister{Request: 124, Registration: reg} - dealer.Unregister(callee, msg) + dealer.Unregister(sess, msg) Convey("The callee should have received an UNREGISTERED message", func() { unreg := callee.received.(*Unregistered).Request @@ -57,9 +59,9 @@ func TestUnregister(t *testing.T) { }) Convey("The dealer should no longer have the endpoint registered", func() { - _, ok := dealer.registrations[testProcedure] + _, ok := dealer.procedures[testProcedure] So(ok, ShouldBeFalse) - _, ok = dealer.procedures[reg] + _, ok = dealer.registrations[reg] So(ok, ShouldBeFalse) }) }) @@ -68,15 +70,17 @@ func TestUnregister(t *testing.T) { func TestCall(t *testing.T) { Convey("With a procedure registered", t, func() { dealer := NewDefaultDealer().(*defaultDealer) - callee := &TestSender{} + callee := &TestPeer{} testProcedure := URI("turnpike.test.endpoint") msg := &Register{Request: 123, Procedure: testProcedure} - dealer.Register(callee, msg) - caller := &TestSender{} + sess := &Session{Peer: callee} + dealer.Register(sess, msg) + caller := &TestPeer{} + callerSession := &Session{Peer: caller} Convey("Calling an invalid procedure", func() { msg := &Call{Request: 124, Procedure: URI("turnpike.test.bad")} - dealer.Call(caller, msg) + dealer.Call(callerSession, msg) Convey("The caller should have received an ERROR message", func() { err := caller.received.(*Error) @@ -87,7 +91,7 @@ func TestCall(t *testing.T) { Convey("Calling a valid procedure", func() { msg := &Call{Request: 125, Procedure: testProcedure} - dealer.Call(caller, msg) + dealer.Call(callerSession, msg) Convey("The callee should have received an INVOCATION message", func() { So(callee.received.MessageType(), ShouldEqual, INVOCATION) @@ -95,7 +99,7 @@ func TestCall(t *testing.T) { Convey("And the callee responds with a YIELD message", func() { msg := &Yield{Request: inv.Request} - dealer.Yield(callee, msg) + dealer.Yield(sess, msg) Convey("The caller should have received a RESULT message", func() { So(caller.received.MessageType(), ShouldEqual, RESULT) @@ -105,7 +109,7 @@ func TestCall(t *testing.T) { Convey("And the callee responds with an ERROR message", func() { msg := &Error{Request: inv.Request} - dealer.Error(callee, msg) + dealer.Error(sess, msg) Convey("The caller should have received an ERROR message", func() { So(caller.received.MessageType(), ShouldEqual, ERROR) @@ -120,24 +124,23 @@ func TestCall(t *testing.T) { func TestRemovePeer(t *testing.T) { Convey("With a procedure registered", t, func() { dealer := NewDefaultDealer().(*defaultDealer) - callee := &TestSender{} + callee := &TestPeer{} testProcedure := URI("turnpike.test.endpoint") msg := &Register{Request: 123, Procedure: testProcedure} - dealer.Register(callee, msg) + sess := &Session{Peer: callee} + dealer.Register(sess, msg) reg := callee.received.(*Registered).Registration - So(dealer.registrations, ShouldContainKey, testProcedure) - So(dealer.procedures, ShouldContainKey, reg) - So(dealer.callees[callee], ShouldContainKey, reg) + So(dealer.procedures, ShouldContainKey, testProcedure) + So(dealer.registrations, ShouldContainKey, reg) Convey("Calling RemoveSession should remove the registration", func() { - dealer.RemovePeer(callee) + dealer.RemoveSession(sess) So(dealer.registrations, ShouldNotContainKey, testProcedure) So(dealer.procedures, ShouldNotContainKey, reg) - So(dealer.callees[callee], ShouldNotContainKey, reg) Convey("And registering the endpoint again should succeed", func() { msg.Request = 124 - dealer.Register(callee, msg) + dealer.Register(sess, msg) So(callee.received.MessageType(), ShouldEqual, REGISTERED) }) }) diff --git a/interceptor.go b/interceptor.go index 52bae31..107510e 100644 --- a/interceptor.go +++ b/interceptor.go @@ -6,7 +6,7 @@ package turnpike // Intercept takes the session and (a pointer to) the message, and (possibly) // modifies the message. type Interceptor interface { - Intercept(session Session, msg *Message) + Intercept(session *Session, msg *Message) } // DefaultInterceptor does nothing :) @@ -18,5 +18,5 @@ func NewDefaultInterceptor() Interceptor { return &defaultInterceptor{} } -func (di *defaultInterceptor) Intercept(session Session, msg *Message) { +func (di *defaultInterceptor) Intercept(session *Session, msg *Message) { } diff --git a/realm.go b/realm.go index 0eed0c6..12f00c4 100644 --- a/realm.go +++ b/realm.go @@ -24,7 +24,7 @@ type Realm struct { Authenticators map[string]Authenticator // DefaultAuth func(details map[string]interface{}) (map[string]interface{}, error) AuthTimeout time.Duration - clients map[ID]Session + clients map[ID]*Session localClient } @@ -34,7 +34,7 @@ type localClient struct { func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) { peerA, peerB := localPipe() - sess := Session{Peer: peerA, Id: NewID(), Details: details, kill: make(chan URI, 1)} + sess := &Session{Peer: peerA, Id: NewID(), Details: details, kill: make(chan URI, 1)} if details == nil { details = make(map[string]interface{}) } @@ -51,7 +51,7 @@ func (r Realm) Close() { } func (r *Realm) init() { - r.clients = make(map[ID]Session) + r.clients = make(map[ID]*Session) p, _ := r.getPeer(nil) r.localClient.Client = NewClient(p) if r.Broker == nil { @@ -82,12 +82,12 @@ func (l *localClient) onLeave(session ID) { l.Publish("wamp.session.on_leave", []interface{}{session}, nil) } -func (r *Realm) handleSession(sess Session) { +func (r *Realm) handleSession(sess *Session) { r.clients[sess.Id] = sess r.onJoin(sess.Details) defer func() { delete(r.clients, sess.Id) - r.Dealer.RemovePeer(sess.Peer) + r.Dealer.RemoveSession(sess) r.onLeave(sess.Id) }() c := sess.Receive() @@ -133,27 +133,27 @@ func (r *Realm) handleSession(sess Session) { // Broker messages case *Publish: - r.Broker.Publish(sess.Peer, msg) + r.Broker.Publish(sess, msg) case *Subscribe: - r.Broker.Subscribe(sess.Peer, msg) + r.Broker.Subscribe(sess, msg) case *Unsubscribe: - r.Broker.Unsubscribe(sess.Peer, msg) + r.Broker.Unsubscribe(sess, msg) // Dealer messages case *Register: - r.Dealer.Register(sess.Peer, msg) + r.Dealer.Register(sess, msg) case *Unregister: - r.Dealer.Unregister(sess.Peer, msg) + r.Dealer.Unregister(sess, msg) case *Call: - r.Dealer.Call(sess.Peer, msg) + r.Dealer.Call(sess, msg) case *Yield: - r.Dealer.Yield(sess.Peer, msg) + r.Dealer.Yield(sess, msg) // Error messages case *Error: if msg.Type == INVOCATION { // the only type of ERROR message the router should receive - r.Dealer.Error(sess.Peer, msg) + r.Dealer.Error(sess, msg) } else { log.Printf("invalid ERROR message received: %v", msg) } diff --git a/router.go b/router.go index 9eddec3..ee06c50 100644 --- a/router.go +++ b/router.go @@ -149,7 +149,7 @@ func (r *defaultRouter) Accept(client Peer) error { // session details welcome.Details["session"] = welcome.Id welcome.Details["realm"] = hello.Realm - sess := Session{ + sess := &Session{ Peer: client, Id: welcome.Id, Details: welcome.Details, diff --git a/session.go b/session.go index 42564c4..a9a0237 100644 --- a/session.go +++ b/session.go @@ -4,19 +4,33 @@ import ( "fmt" ) +const ( + MAX_REQUEST_ID = 1 << 53 +) + // Session represents an active WAMP session type Session struct { Peer Id ID Details map[string]interface{} - kill chan URI + lastRequestId ID + kill chan URI } func (s Session) String() string { return fmt.Sprintf("%d", s.Id) } +func (s *Session) NextRequestId() ID { + s.lastRequestId++ + // max value is 2^53 + if s.lastRequestId > MAX_REQUEST_ID { + s.lastRequestId = 1 + } + return s.lastRequestId +} + // localPipe creates two linked sessions. Messages sent to one will // appear in the Receive of the other. This is useful for implementing // client sessions diff --git a/session_test.go b/session_test.go new file mode 100644 index 0000000..9518a14 --- /dev/null +++ b/session_test.go @@ -0,0 +1,21 @@ +package turnpike + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestNextRequestId(t *testing.T) { + Convey("Incrementing session id", t, func() { + sess := &Session{} + Convey("Should increment on subsequent calls", func() { + So(sess.NextRequestId(), ShouldEqual, 1) + So(sess.NextRequestId(), ShouldEqual, 2) + }) + Convey("Should roll over upon reaching the max session id size", func() { + sess.lastRequestId = MAX_REQUEST_ID + So(sess.NextRequestId(), ShouldEqual, 1) + }) + }) +} From 4621c9349b1cd4400c0ce8acc9d1b5f8ab1e0fd6 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Tue, 12 Jan 2016 15:39:35 -0700 Subject: [PATCH 02/25] Fix websocket send concurrency issue - fixes a problem when running with AutobahnPython and twisted --- websocket.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/websocket.go b/websocket.go index b34d908..b66088c 100644 --- a/websocket.go +++ b/websocket.go @@ -2,6 +2,7 @@ package turnpike import ( "fmt" + "sync" "time" "github.com/gorilla/websocket" @@ -13,6 +14,7 @@ type websocketPeer struct { messages chan Message payloadType int closed bool + mutex sync.Mutex } // NewWebsocketPeer connects to the websocket server at the specified url. @@ -56,6 +58,8 @@ func (ep *websocketPeer) Send(msg Message) error { if err != nil { return err } + ep.mutex.Lock() + defer ep.mutex.Unlock() return ep.conn.WriteMessage(ep.payloadType, b) } func (ep *websocketPeer) Receive() <-chan Message { From 695927032d29e2855b06c2d6582165dc5b9c06a0 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Wed, 13 Jan 2016 11:48:26 -0700 Subject: [PATCH 03/25] Added mutex to invocation map - prevent data race; maps aren't safe for concurrent use --- dealer.go | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/dealer.go b/dealer.go index d60c391..4e39938 100644 --- a/dealer.go +++ b/dealer.go @@ -1,5 +1,9 @@ package turnpike +import ( + "sync" +) + // A Dealer routes and manages RPC calls to callees. type Dealer interface { // Register a procedure on an endpoint @@ -38,6 +42,10 @@ type defaultDealer struct { // link the invocation ID to the call ID invocations map[*Session]map[ID]rpcRequest // callees map[*Session]map[ID]bool + + // single lock for all invocations; could use RWLock, but in most (all?) cases we want a write lock + // TODO: add the lock per session + invocationLock sync.Mutex } // NewDefaultDealer returns the default turnpike dealer implementation @@ -95,6 +103,9 @@ func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) { } func (d *defaultDealer) Call(sess *Session, msg *Call) { + d.invocationLock.Lock() + defer d.invocationLock.Unlock() + if rproc, ok := d.procedures[msg.Procedure]; !ok { sess.Peer.Send(&Error{ Type: msg.MessageType(), @@ -125,6 +136,9 @@ func (d *defaultDealer) Call(sess *Session, msg *Call) { } func (d *defaultDealer) Yield(sess *Session, msg *Yield) { + d.invocationLock.Lock() + defer d.invocationLock.Unlock() + if d.invocations[sess] == nil { log.Println("received YIELD message from unknown session:", sess.Id) return @@ -133,10 +147,11 @@ func (d *defaultDealer) Yield(sess *Session, msg *Yield) { // WAMP spec doesn't allow sending an error in response to a YIELD message log.Println("received YIELD message with invalid invocation request ID:", msg.Request) } else { - // TODO: delete old keys; could do here, but should probably lock the map + // delete old keys delete(d.invocations[sess], msg.Request) + // return the result to the caller - call.caller.Send(&Result{ + go call.caller.Send(&Result{ Request: call.requestId, Details: map[string]interface{}{}, Arguments: msg.Arguments, @@ -144,19 +159,27 @@ func (d *defaultDealer) Yield(sess *Session, msg *Yield) { }) log.Printf("returned YIELD %v to caller as RESULT %v", msg.Request, call.requestId) } + + if len(d.invocations[sess]) == 0 { + delete(d.invocations, sess) + } } func (d *defaultDealer) Error(sess *Session, msg *Error) { + d.invocationLock.Lock() + defer d.invocationLock.Unlock() + if d.invocations[sess] == nil { - log.Println("received YIELD message from unknown session:", sess.Id) + log.Println("received ERROR message from unknown session:", sess.Id) return } if call, ok := d.invocations[sess][msg.Request]; !ok { log.Println("received ERROR (INVOCATION) message with invalid invocation request ID:", msg.Request) } else { delete(d.invocations[sess], msg.Request) + // return an error to the caller - call.caller.Peer.Send(&Error{ + go call.caller.Peer.Send(&Error{ Type: CALL, Request: call.requestId, Details: make(map[string]interface{}), @@ -165,6 +188,10 @@ func (d *defaultDealer) Error(sess *Session, msg *Error) { }) log.Printf("returned ERROR %v to caller as ERROR %v", msg.Request, call.requestId) } + + if len(d.invocations[sess]) == 0 { + delete(d.invocations, sess) + } } func (d *defaultDealer) RemoveSession(sess *Session) { From 1f80efd85a11943c37d4d592cb8be80315785ed3 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Wed, 13 Jan 2016 16:27:38 -0700 Subject: [PATCH 04/25] Try increasing buffers --- session.go | 4 ++-- websocket.go | 2 +- websocket_server.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index a9a0237..0912bce 100644 --- a/session.go +++ b/session.go @@ -35,8 +35,8 @@ func (s *Session) NextRequestId() ID { // appear in the Receive of the other. This is useful for implementing // client sessions func localPipe() (*localPeer, *localPeer) { - aToB := make(chan Message, 10) - bToA := make(chan Message, 10) + aToB := make(chan Message, 100) + bToA := make(chan Message, 100) a := &localPeer{ incoming: bToA, diff --git a/websocket.go b/websocket.go index b66088c..f78c986 100644 --- a/websocket.go +++ b/websocket.go @@ -43,7 +43,7 @@ func newWebsocketPeer(url, protocol, origin string, serializer Serializer, paylo } ep := &websocketPeer{ conn: conn, - messages: make(chan Message, 10), + messages: make(chan Message, 100), serializer: serializer, payloadType: payloadType, } diff --git a/websocket_server.go b/websocket_server.go index 14849a9..c04414c 100644 --- a/websocket_server.go +++ b/websocket_server.go @@ -137,7 +137,7 @@ func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) { peer := websocketPeer{ conn: conn, serializer: serializer, - messages: make(chan Message, 10), + messages: make(chan Message, 100), payloadType: payloadType, } go peer.run() From 7731912e098a4338cc96aa630c0fb6cd4ee909eb Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 21 Jan 2016 16:16:50 -0700 Subject: [PATCH 05/25] Add error URI to errors in RPC commands --- dealer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dealer.go b/dealer.go index 4e39938..dcfc81b 100644 --- a/dealer.go +++ b/dealer.go @@ -182,6 +182,7 @@ func (d *defaultDealer) Error(sess *Session, msg *Error) { go call.caller.Peer.Send(&Error{ Type: CALL, Request: call.requestId, + Error: msg.Error, Details: make(map[string]interface{}), Arguments: msg.Arguments, ArgumentsKw: msg.ArgumentsKw, From dd3c64aca27d322c9cd8983c45321947c5d60e6a Mon Sep 17 00:00:00 2001 From: JakobGreen <jakeg@spotterrf.com> Date: Fri, 5 Feb 2016 14:44:19 -0700 Subject: [PATCH 06/25] Fixed error reporting on RPC calls --- client.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/client.go b/client.go index e59fb0b..ff23500 100644 --- a/client.go +++ b/client.go @@ -482,6 +482,15 @@ func (c *Client) Publish(topic string, args []interface{}, kwargs map[string]int }) } +type RPCError struct { + ErrorMessage *Error + Procedure string +} + +func (rpc RPCError) Error() string { + return fmt.Sprintf("error calling procedure '%v': %v: %v: %v", rpc.Procedure, rpc.ErrorMessage.Error, rpc.ErrorMessage.Arguments, rpc.ErrorMessage.ArgumentsKw) +} + // Call calls a procedure given a URI. func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]interface{}) (*Result, error) { id := NewID() @@ -503,7 +512,7 @@ func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]in if err != nil { return nil, err } else if e, ok := msg.(*Error); ok { - return nil, fmt.Errorf("error calling procedure '%v': %v", procedure, e.Error) + return nil, RPCError{e, procedure} } else if result, ok := msg.(*Result); !ok { return nil, fmt.Errorf(formatUnexpectedMessage(msg, RESULT)) } else { From 96f011e1c1fc3d75d7d3df0fb22218bd0cfae35a Mon Sep 17 00:00:00 2001 From: gngeorgiev <gngeorgiev.it@gmail.com> Date: Sat, 26 Mar 2016 01:39:33 +0200 Subject: [PATCH 07/25] fixed concurrent writes to map in realm --- dealer_test.go | 9 +++++++++ realm.go | 25 ++++++++++++++++--------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/dealer_test.go b/dealer_test.go index b862459..49cb054 100644 --- a/dealer_test.go +++ b/dealer_test.go @@ -2,6 +2,7 @@ package turnpike import ( "testing" + "time" . "github.com/smartystreets/goconvey/convey" ) @@ -101,7 +102,11 @@ func TestCall(t *testing.T) { msg := &Yield{Request: inv.Request} dealer.Yield(sess, msg) + // give it some time to propagate + time.Sleep(time.Millisecond) + Convey("The caller should have received a RESULT message", func() { + So(caller.received, ShouldNotBeNil) So(caller.received.MessageType(), ShouldEqual, RESULT) So(caller.received.(*Result).Request, ShouldEqual, 125) }) @@ -111,7 +116,11 @@ func TestCall(t *testing.T) { msg := &Error{Request: inv.Request} dealer.Error(sess, msg) + // give it some time to propagate + time.Sleep(time.Millisecond) + Convey("The caller should have received an ERROR message", func() { + So(caller.received, ShouldNotBeNil) So(caller.received.MessageType(), ShouldEqual, ERROR) So(caller.received.(*Error).Request, ShouldEqual, 125) }) diff --git a/realm.go b/realm.go index 12f00c4..433686b 100644 --- a/realm.go +++ b/realm.go @@ -3,6 +3,8 @@ package turnpike import ( "fmt" "time" + + "github.com/streamrail/concurrent-map" ) const ( @@ -24,7 +26,7 @@ type Realm struct { Authenticators map[string]Authenticator // DefaultAuth func(details map[string]interface{}) (map[string]interface{}, error) AuthTimeout time.Duration - clients map[ID]*Session + clients cmap.ConcurrentMap localClient } @@ -45,13 +47,18 @@ func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) { // Close disconnects all clients after sending a goodbye message func (r Realm) Close() { - for _, client := range r.clients { - client.kill <- ErrSystemShutdown + iter := r.clients.Iter() + for client := range iter { + sess, isSession := client.Val.(*Session) + if !isSession { + continue + } + sess.kill <- ErrSystemShutdown } } func (r *Realm) init() { - r.clients = make(map[ID]*Session) + r.clients = cmap.New() p, _ := r.getPeer(nil) r.localClient.Client = NewClient(p) if r.Broker == nil { @@ -83,10 +90,10 @@ func (l *localClient) onLeave(session ID) { } func (r *Realm) handleSession(sess *Session) { - r.clients[sess.Id] = sess + r.clients.Set(string(sess.Id), sess) r.onJoin(sess.Details) defer func() { - delete(r.clients, sess.Id) + r.clients.Remove(string(sess.Id)) r.Dealer.RemoveSession(sess) r.onLeave(sess.Id) }() @@ -255,9 +262,9 @@ func addAuthMethod(details map[string]interface{}, method string) map[string]int } // r := Realm{ -// Authenticators: map[string]turnpike.Authenticator{ -// "wampcra": turnpike.NewCRAAuthenticatorFactoryFactory(mySecret), -// "ticket": turnpike.NewTicketAuthenticator(myTicket), +// Authenticators: map[string]gowamp.Authenticator{ +// "wampcra": gowamp.NewCRAAuthenticatorFactoryFactory(mySecret), +// "ticket": gowamp.NewTicketAuthenticator(myTicket), // "asdfasdf": myAsdfAuthenticator, // }, // BasicAuthenticators: map[string]turnpike.BasicAuthenticator{ From 742b9c3bef3c0bf76b26cbad78c3bf299690971d Mon Sep 17 00:00:00 2001 From: gngeorgiev <gngeorgiev.it@gmail.com> Date: Sat, 26 Mar 2016 01:30:06 +0200 Subject: [PATCH 08/25] added exclude_me support --- broker.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/broker.go b/broker.go index 3c864d3..9060132 100644 --- a/broker.go +++ b/broker.go @@ -39,12 +39,18 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) { ArgumentsKw: msg.ArgumentsKw, Details: make(map[string]interface{}), } + + excludePublisher := true + if exclude, ok := msg.Options["exclude_me"].(bool); ok { + excludePublisher = exclude + } + for id, sub := range br.routes[msg.Topic] { // shallow-copy the template event := evtTemplate event.Subscription = id // don't send event to publisher - if sub != pub { + if sub != pub || !excludePublisher { sub.Send(&event) } } From bf55c97447e62b9d7c32846451f70e60088a9c4b Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 31 Mar 2016 14:11:26 -0600 Subject: [PATCH 09/25] Fixed data races --- broker.go | 15 ++++ broker_test.go | 13 ++++ data_races.txt | 44 ++++++++++++ dealer.go | 14 ++++ dealer_test.go | 14 ++-- realm.go | 188 ++++++++++++++++++++++++++++--------------------- router.go | 38 ++++++---- 7 files changed, 228 insertions(+), 98 deletions(-) create mode 100644 data_races.txt diff --git a/broker.go b/broker.go index 9060132..b28827c 100644 --- a/broker.go +++ b/broker.go @@ -1,5 +1,9 @@ package turnpike +import ( + "sync" +) + // Broker is the interface implemented by an object that handles routing EVENTS // from Publishers to Subscribers. type Broker interface { @@ -15,6 +19,8 @@ type Broker interface { type defaultBroker struct { routes map[URI]map[ID]Sender subscriptions map[ID]URI + + sync.RWMutex } // NewDefaultBroker initializes and returns a simple broker that matches URIs to @@ -31,6 +37,9 @@ func NewDefaultBroker() Broker { // If msg.Options["acknowledge"] == true, the publisher receives a Published event // after the message has been sent to all subscribers. func (br *defaultBroker) Publish(sess *Session, msg *Publish) { + br.RLock() + defer br.RUnlock() + pub := sess.Peer pubID := sess.NextRequestId() evtTemplate := Event{ @@ -63,6 +72,9 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) { // Subscribe subscribes the client to the given topic. func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) { + br.Lock() + defer br.Unlock() + if _, ok := br.routes[msg.Topic]; !ok { br.routes[msg.Topic] = make(map[ID]Sender) } @@ -75,6 +87,9 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) { } func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) { + br.Lock() + defer br.Unlock() + topic, ok := br.subscriptions[msg.Subscription] if !ok { err := &Error{ diff --git a/broker_test.go b/broker_test.go index d53807f..d428ee9 100644 --- a/broker_test.go +++ b/broker_test.go @@ -1,6 +1,7 @@ package turnpike import ( + "sync" "testing" . "github.com/smartystreets/goconvey/convey" @@ -9,13 +10,25 @@ import ( type TestPeer struct { received Message sent Message + + sync.RWMutex } func (s *TestPeer) Send(msg Message) error { + s.Lock() + defer s.Unlock() + s.received = msg return nil } +func (s *TestPeer) getReceived() Message { + s.RLock() + defer s.RUnlock() + + return s.received +} + // TODO: implement me func (s *TestPeer) Receive() <-chan Message { return nil } func (s *TestPeer) Close() error { return nil } diff --git a/data_races.txt b/data_races.txt new file mode 100644 index 0000000..d24dd8f --- /dev/null +++ b/data_races.txt @@ -0,0 +1,44 @@ +... +3 total assertions + +... +6 total assertions + +. +7 total assertions + +. +8 total assertions + +............... +23 total assertions + +....... +30 total assertions + +... +33 total assertions + +.......... +43 total assertions + +..... +48 total assertions + +.. +50 total assertions + +..... +55 total assertions + +...... +61 total assertions + +..... +66 total assertions + +... +69 total assertions + +PASS +ok gopkg.in/beatgammit/turnpike.v2 1.045s diff --git a/dealer.go b/dealer.go index dcfc81b..3758714 100644 --- a/dealer.go +++ b/dealer.go @@ -46,6 +46,8 @@ type defaultDealer struct { // single lock for all invocations; could use RWLock, but in most (all?) cases we want a write lock // TODO: add the lock per session invocationLock sync.Mutex + + sync.RWMutex } // NewDefaultDealer returns the default turnpike dealer implementation @@ -58,6 +60,9 @@ func NewDefaultDealer() Dealer { } func (d *defaultDealer) Register(sess *Session, msg *Register) { + d.Lock() + defer d.Unlock() + if id, ok := d.procedures[msg.Procedure]; ok { log.Println("error: procedure already exists:", msg.Procedure, id) sess.Peer.Send(&Error{ @@ -82,6 +87,9 @@ func (d *defaultDealer) Register(sess *Session, msg *Register) { } func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) { + d.Lock() + defer d.Unlock() + if procedure, ok := d.registrations[msg.Registration]; !ok { // the registration doesn't exist log.Println("error: no such registration:", msg.Registration) @@ -103,6 +111,9 @@ func (d *defaultDealer) Unregister(sess *Session, msg *Unregister) { } func (d *defaultDealer) Call(sess *Session, msg *Call) { + d.Lock() + defer d.Unlock() + d.invocationLock.Lock() defer d.invocationLock.Unlock() @@ -136,6 +147,9 @@ func (d *defaultDealer) Call(sess *Session, msg *Call) { } func (d *defaultDealer) Yield(sess *Session, msg *Yield) { + d.Lock() + defer d.Unlock() + d.invocationLock.Lock() defer d.invocationLock.Unlock() diff --git a/dealer_test.go b/dealer_test.go index 49cb054..97fcaf8 100644 --- a/dealer_test.go +++ b/dealer_test.go @@ -84,7 +84,7 @@ func TestCall(t *testing.T) { dealer.Call(callerSession, msg) Convey("The caller should have received an ERROR message", func() { - err := caller.received.(*Error) + err := caller.getReceived().(*Error) So(err.Error, ShouldEqual, ErrNoSuchProcedure) So(err.Details, ShouldNotBeNil) }) @@ -106,9 +106,9 @@ func TestCall(t *testing.T) { time.Sleep(time.Millisecond) Convey("The caller should have received a RESULT message", func() { - So(caller.received, ShouldNotBeNil) - So(caller.received.MessageType(), ShouldEqual, RESULT) - So(caller.received.(*Result).Request, ShouldEqual, 125) + So(caller.getReceived(), ShouldNotBeNil) + So(caller.getReceived().MessageType(), ShouldEqual, RESULT) + So(caller.getReceived().(*Result).Request, ShouldEqual, 125) }) }) @@ -120,9 +120,9 @@ func TestCall(t *testing.T) { time.Sleep(time.Millisecond) Convey("The caller should have received an ERROR message", func() { - So(caller.received, ShouldNotBeNil) - So(caller.received.MessageType(), ShouldEqual, ERROR) - So(caller.received.(*Error).Request, ShouldEqual, 125) + So(caller.getReceived(), ShouldNotBeNil) + So(caller.getReceived().MessageType(), ShouldEqual, ERROR) + So(caller.getReceived().(*Error).Request, ShouldEqual, 125) }) }) }) diff --git a/realm.go b/realm.go index 433686b..3f48b2f 100644 --- a/realm.go +++ b/realm.go @@ -2,6 +2,7 @@ package turnpike import ( "fmt" + "sync" "time" "github.com/streamrail/concurrent-map" @@ -16,22 +17,25 @@ const ( // Clients that have connected to a WAMP router are joined to a realm and all // message delivery is handled by the realm. type Realm struct { - _ string - URI URI - Broker - Dealer - Authorizer - Interceptor + _ string + URI URI + Broker Broker + Dealer Dealer + Authorizer Authorizer + Interceptor Interceptor CRAuthenticators map[string]CRAuthenticator Authenticators map[string]Authenticator // DefaultAuth func(details map[string]interface{}) (map[string]interface{}, error) AuthTimeout time.Duration clients cmap.ConcurrentMap - localClient + localClient *localClient + + lock sync.RWMutex } type localClient struct { *Client + sync.Mutex } func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) { @@ -58,9 +62,18 @@ func (r Realm) Close() { } func (r *Realm) init() { + r.lock.Lock() + defer r.lock.Unlock() + r.clients = cmap.New() - p, _ := r.getPeer(nil) - r.localClient.Client = NewClient(p) + + if r.localClient == nil { + p, _ := r.getPeer(nil) + client := NewClient(p) + r.localClient = new(localClient) + r.localClient.Client = client + } + if r.Broker == nil { r.Broker = NewDefaultBroker() } @@ -82,92 +95,109 @@ func (r *Realm) init() { // } func (l *localClient) onJoin(details map[string]interface{}) { + l.Lock() + defer l.Unlock() l.Publish("wamp.session.on_join", []interface{}{details}, nil) } func (l *localClient) onLeave(session ID) { + l.Lock() + defer l.Unlock() l.Publish("wamp.session.on_leave", []interface{}{session}, nil) } +func (r *Realm) doOne(c <-chan Message, sess *Session) bool { + r.lock.RLock() + defer r.lock.RUnlock() + + var msg Message + var open bool + select { + case msg, open = <-c: + if !open { + log.Println("lost session:", sess) + return false + } + case reason := <-sess.kill: + logErr(sess.Send(&Goodbye{Reason: reason, Details: make(map[string]interface{})})) + log.Printf("kill session %s: %v", sess, reason) + // TODO: wait for client Goodbye? + return false + } + + log.Printf("[%s] %s: %+v", sess, msg.MessageType(), msg) + if isAuthz, err := r.Authorizer.Authorize(sess, msg); !isAuthz { + errMsg := &Error{Type: msg.MessageType()} + if err != nil { + errMsg.Error = ErrAuthorizationFailed + log.Printf("[%s] authorization failed: %v", sess, err) + } else { + errMsg.Error = ErrNotAuthorized + log.Printf("[%s] %s UNAUTHORIZED", sess, msg.MessageType()) + } + logErr(sess.Send(errMsg)) + return true + } + + r.Interceptor.Intercept(sess, &msg) + + switch msg := msg.(type) { + case *Goodbye: + logErr(sess.Send(&Goodbye{Reason: ErrGoodbyeAndOut, Details: make(map[string]interface{})})) + log.Printf("[%s] leaving: %v", sess, msg.Reason) + return false + + // Broker messages + case *Publish: + r.Broker.Publish(sess, msg) + case *Subscribe: + r.Broker.Subscribe(sess, msg) + case *Unsubscribe: + r.Broker.Unsubscribe(sess, msg) + + // Dealer messages + case *Register: + r.Dealer.Register(sess, msg) + case *Unregister: + r.Dealer.Unregister(sess, msg) + case *Call: + r.Dealer.Call(sess, msg) + case *Yield: + r.Dealer.Yield(sess, msg) + + // Error messages + case *Error: + if msg.Type == INVOCATION { + // the only type of ERROR message the router should receive + r.Dealer.Error(sess, msg) + } else { + log.Printf("invalid ERROR message received: %v", msg) + } + + default: + log.Println("Unhandled message:", msg.MessageType()) + } + return true +} + func (r *Realm) handleSession(sess *Session) { + r.lock.RLock() r.clients.Set(string(sess.Id), sess) - r.onJoin(sess.Details) + r.localClient.onJoin(sess.Details) + r.lock.RUnlock() + defer func() { + r.lock.RLock() + defer r.lock.RUnlock() + r.clients.Remove(string(sess.Id)) r.Dealer.RemoveSession(sess) - r.onLeave(sess.Id) + r.localClient.onLeave(sess.Id) }() c := sess.Receive() // TODO: what happens if the realm is closed? - for { - var msg Message - var open bool - select { - case msg, open = <-c: - if !open { - log.Println("lost session:", sess) - return - } - case reason := <-sess.kill: - logErr(sess.Send(&Goodbye{Reason: reason, Details: make(map[string]interface{})})) - log.Printf("kill session %s: %v", sess, reason) - // TODO: wait for client Goodbye? - return - } - - log.Printf("[%s] %s: %+v", sess, msg.MessageType(), msg) - if isAuthz, err := r.Authorizer.Authorize(sess, msg); !isAuthz { - errMsg := &Error{Type: msg.MessageType()} - if err != nil { - errMsg.Error = ErrAuthorizationFailed - log.Printf("[%s] authorization failed: %v", sess, err) - } else { - errMsg.Error = ErrNotAuthorized - log.Printf("[%s] %s UNAUTHORIZED", sess, msg.MessageType()) - } - logErr(sess.Send(errMsg)) - continue - } - - r.Interceptor.Intercept(sess, &msg) - - switch msg := msg.(type) { - case *Goodbye: - logErr(sess.Send(&Goodbye{Reason: ErrGoodbyeAndOut, Details: make(map[string]interface{})})) - log.Printf("[%s] leaving: %v", sess, msg.Reason) - return - - // Broker messages - case *Publish: - r.Broker.Publish(sess, msg) - case *Subscribe: - r.Broker.Subscribe(sess, msg) - case *Unsubscribe: - r.Broker.Unsubscribe(sess, msg) - - // Dealer messages - case *Register: - r.Dealer.Register(sess, msg) - case *Unregister: - r.Dealer.Unregister(sess, msg) - case *Call: - r.Dealer.Call(sess, msg) - case *Yield: - r.Dealer.Yield(sess, msg) - - // Error messages - case *Error: - if msg.Type == INVOCATION { - // the only type of ERROR message the router should receive - r.Dealer.Error(sess, msg) - } else { - log.Printf("invalid ERROR message received: %v", msg) - } - - default: - log.Println("Unhandled message:", msg.MessageType()) - } + for r.doOne(c, sess) { } } diff --git a/router.go b/router.go index ee06c50..270ada5 100644 --- a/router.go +++ b/router.go @@ -4,6 +4,8 @@ import ( "fmt" "sync" "time" + + "github.com/streamrail/concurrent-map" ) var defaultWelcomeDetails = map[string]interface{}{ @@ -44,7 +46,7 @@ type Router interface { // DefaultRouter is the default WAMP router implementation. type defaultRouter struct { - realms map[URI]Realm + realms cmap.ConcurrentMap closing bool closeLock sync.Mutex sessionOpenCallbacks []func(uint, string) @@ -54,7 +56,7 @@ type defaultRouter struct { // NewDefaultRouter creates a very basic WAMP router. func NewDefaultRouter() Router { return &defaultRouter{ - realms: make(map[URI]Realm), + realms: cmap.New(), sessionOpenCallbacks: []func(uint, string){}, sessionCloseCallbacks: []func(uint, string){}, } @@ -76,18 +78,22 @@ func (r *defaultRouter) Close() error { } r.closing = true r.closeLock.Unlock() - for _, realm := range r.realms { - realm.Close() + for val := range r.realms.Iter() { + if realm, ok := val.Val.(*Realm); ok { + realm.Close() + } else { + log.Printf("defaultRouter.Close: expecting realm, found invalid type: %T", val.Val) + } } return nil } func (r *defaultRouter) RegisterRealm(uri URI, realm Realm) error { - if _, ok := r.realms[uri]; ok { + if _, ok := r.realms.Get(string(uri)); ok { return RealmExistsError(uri) } realm.init() - r.realms[uri] = realm + r.realms.Set(string(uri), &realm) log.Println("registered realm:", uri) return nil } @@ -112,8 +118,14 @@ func (r *defaultRouter) Accept(client Peer) error { return fmt.Errorf("protocol violation: expected HELLO, received %s", msg.MessageType()) } - realm, ok := r.realms[hello.Realm] - if !ok { + var realm *Realm + if val, ok := r.realms.Get(string(hello.Realm)); ok { + if realm, ok = val.(*Realm); !ok { + logErr(client.Send(&Abort{Reason: ErrNoSuchRealm})) + logErr(client.Close()) + return NoSuchRealmError(hello.Realm) + } + } else { logErr(client.Send(&Abort{Reason: ErrNoSuchRealm})) logErr(client.Close()) return NoSuchRealmError(hello.Realm) @@ -170,12 +182,14 @@ func (r *defaultRouter) Accept(client Peer) error { // GetLocalPeer returns an internal peer connected to the specified realm. func (r *defaultRouter) GetLocalPeer(realmURI URI, details map[string]interface{}) (Peer, error) { - realm, ok := r.realms[realmURI] - if !ok { + if val, ok := r.realms.Get(string(realmURI)); !ok { + return nil, NoSuchRealmError(realmURI) + } else if realm, ok := val.(*Realm); !ok { return nil, NoSuchRealmError(realmURI) + } else { + // TODO: session open/close callbacks? + return realm.getPeer(details) } - // TODO: session open/close callbacks? - return realm.getPeer(details) } func (r *defaultRouter) getTestPeer() Peer { From 7e3d305799ec421577b6e6e343d98f9fa7303ff9 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Mon, 1 Aug 2016 13:36:24 -0600 Subject: [PATCH 10/25] Fix problems using the wrong types of ids in pub/sub - fixes problems with multiple subscribers to the same topic --- broker.go | 15 +++++++++++++-- broker_test.go | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/broker.go b/broker.go index b28827c..62e92f9 100644 --- a/broker.go +++ b/broker.go @@ -20,6 +20,8 @@ type defaultBroker struct { routes map[URI]map[ID]Sender subscriptions map[ID]URI + lastRequestId ID + sync.RWMutex } @@ -32,6 +34,15 @@ func NewDefaultBroker() Broker { } } +func (br *defaultBroker) nextRequestId() ID { + br.lastRequestId++ + if br.lastRequestId > MAX_REQUEST_ID { + br.lastRequestId = 1 + } + + return br.lastRequestId +} + // Publish sends a message to all subscribed clients except for the sender. // // If msg.Options["acknowledge"] == true, the publisher receives a Published event @@ -41,7 +52,7 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) { defer br.RUnlock() pub := sess.Peer - pubID := sess.NextRequestId() + pubID := NewID() evtTemplate := Event{ Publication: pubID, Arguments: msg.Arguments, @@ -78,7 +89,7 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) { if _, ok := br.routes[msg.Topic]; !ok { br.routes[msg.Topic] = make(map[ID]Sender) } - id := sess.NextRequestId() + id := br.nextRequestId() br.routes[msg.Topic][id] = sess.Peer br.subscriptions[id] = msg.Topic diff --git a/broker_test.go b/broker_test.go index d428ee9..aa75a83 100644 --- a/broker_test.go +++ b/broker_test.go @@ -53,8 +53,43 @@ func TestSubscribe(t *testing.T) { So(ok, ShouldBeTrue) So(topic, ShouldEqual, testTopic) }) + }) +} + +func TestBrokerNextRequestId(t *testing.T) { + Convey("nextRequestId called multiple times", t, func() { + broker := NewDefaultBroker().(*defaultBroker) + id1 := broker.nextRequestId() + id2 := broker.nextRequestId() + + So(id1, ShouldNotEqual, id2) + }) + + Convey("nextRequestId should roll over", t, func() { + broker := NewDefaultBroker().(*defaultBroker) + broker.lastRequestId = MAX_REQUEST_ID + id := broker.nextRequestId() + + So(id, ShouldEqual, 1) + }) +} - // TODO: multiple subscribe requests? +func TestMultipleSubscribe(t *testing.T) { + const SUBSCRIBERS = 2 + const TOPIC = URI("turnpike.test.topic") + + Convey("Multiple subscribers to a topic", t, func() { + broker := NewDefaultBroker().(*defaultBroker) + for i := 0; i < SUBSCRIBERS; i++ { + subscriber := &TestPeer{} + sess := &Session{Peer: subscriber, Id: NewID()} + msg := &Subscribe{Request: sess.NextRequestId(), Topic: TOPIC} + broker.Subscribe(sess, msg) + } + + Convey("There should be a map entry for each subscriber", func() { + So(len(broker.routes[TOPIC]), ShouldEqual, SUBSCRIBERS) + }) }) } From 405ff6929714d11c3d0aceab608fba727e397e91 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 4 Aug 2016 09:49:26 -0600 Subject: [PATCH 11/25] Change all imports to my repo --- examples/auth/client/client.go | 2 +- examples/auth/server/server.go | 2 +- examples/autobahn/server/main.go | 2 +- examples/chat/chatclient/main.go | 2 +- examples/chat/chatserver/main.go | 2 +- examples/meta-api/meta-client.go | 2 +- examples/rpc/rpc-client/main.go | 2 +- examples/rpc/rpc-server/main.go | 2 +- examples/web/server.go | 2 +- turnpike/main.go | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/auth/client/client.go b/examples/auth/client/client.go index 30a6e0a..2ae5944 100644 --- a/examples/auth/client/client.go +++ b/examples/auth/client/client.go @@ -9,7 +9,7 @@ import ( "time" "github.com/howeyc/gopass" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) var password []byte diff --git a/examples/auth/server/server.go b/examples/auth/server/server.go index 877aa4a..326a04a 100644 --- a/examples/auth/server/server.go +++ b/examples/auth/server/server.go @@ -9,7 +9,7 @@ import ( "net/http" "github.com/satori/go.uuid" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) // this is just an example, please don't actually use it diff --git a/examples/autobahn/server/main.go b/examples/autobahn/server/main.go index 8f9c40b..914f3fa 100644 --- a/examples/autobahn/server/main.go +++ b/examples/autobahn/server/main.go @@ -4,7 +4,7 @@ import ( "log" "net/http" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) func main() { diff --git a/examples/chat/chatclient/main.go b/examples/chat/chatclient/main.go index 1395f35..760fdf2 100644 --- a/examples/chat/chatclient/main.go +++ b/examples/chat/chatclient/main.go @@ -4,7 +4,7 @@ import ( "log" "os" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) const ( diff --git a/examples/chat/chatserver/main.go b/examples/chat/chatserver/main.go index 8f9c40b..914f3fa 100644 --- a/examples/chat/chatserver/main.go +++ b/examples/chat/chatserver/main.go @@ -4,7 +4,7 @@ import ( "log" "net/http" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) func main() { diff --git a/examples/meta-api/meta-client.go b/examples/meta-api/meta-client.go index bdd8310..0634d7c 100644 --- a/examples/meta-api/meta-client.go +++ b/examples/meta-api/meta-client.go @@ -3,7 +3,7 @@ package main import ( "log" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) func main() { diff --git a/examples/rpc/rpc-client/main.go b/examples/rpc/rpc-client/main.go index ff3516c..786c945 100644 --- a/examples/rpc/rpc-client/main.go +++ b/examples/rpc/rpc-client/main.go @@ -7,7 +7,7 @@ import ( "os" "strconv" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) func main() { diff --git a/examples/rpc/rpc-server/main.go b/examples/rpc/rpc-server/main.go index ff428b7..0e26b74 100644 --- a/examples/rpc/rpc-server/main.go +++ b/examples/rpc/rpc-server/main.go @@ -5,7 +5,7 @@ import ( "net/http" "time" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) var client *turnpike.Client diff --git a/examples/web/server.go b/examples/web/server.go index 0a92017..eddd74e 100644 --- a/examples/web/server.go +++ b/examples/web/server.go @@ -4,7 +4,7 @@ import ( "log" "net/http" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) func main() { diff --git a/turnpike/main.go b/turnpike/main.go index b11fc68..58c1b26 100644 --- a/turnpike/main.go +++ b/turnpike/main.go @@ -8,7 +8,7 @@ import ( "os/signal" "time" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) var ( From c88b89f52b388a5f4e5b9f2f00c472a87388419e Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 4 Aug 2016 09:53:24 -0600 Subject: [PATCH 12/25] Changed most references to old repo --- README.md | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 80d02a7..9cdd0c5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Turnpike [![Build Status](https://drone.io/github.com/jcelliott/turnpike/status.png)](https://drone.io/github.com/jcelliott/turnpike/latest) [![Coverage Status](https://coveralls.io/repos/jcelliott/turnpike/badge.svg?branch=v2)](https://coveralls.io/r/jcelliott/turnpike?branch=v2) [![GoDoc](https://godoc.org/gopkg.in/jcelliott/turnpike?status.svg)](http://godoc.org/gopkg.in/jcelliott/turnpike.v2) +Turnpike [![Build Status](https://drone.io/github.com/jcelliott/turnpike/status.png)](https://drone.io/github.com/jcelliott/turnpike/latest) [![Coverage Status](https://coveralls.io/repos/jcelliott/turnpike/badge.svg?branch=v2)](https://coveralls.io/r/jcelliott/turnpike?branch=v2) [![GoDoc](https://godoc.org/gopkg.in/beatgammit/turnpike?status.svg)](http://godoc.org/gopkg.in/beatgammit/turnpike.v2) === Go implementation of [WAMP](http://wamp.ws/) - The Web Application Messaging Protocol @@ -18,25 +18,23 @@ basic stand-alone router. The router library can be used to embed a WAMP router in another application, or to build a custom router implementation. The client library can be used to communicate with any WAMP router. -This version of Turnpike supports WAMP v2. For WAMP v1 support see the [v1 branch](https://github.com/jcelliott/turnpike/tree/v1). - Status --- Turnpike v2 is still under development, but is getting close to a stable release. If you have any feedback or suggestions, please -[open an issue](https://github.com/jcelliott/turnpike/issues/new). +[open an issue](https://github.com/beatgammit/turnpike/issues/new). Installation --- Library: - go get -u gopkg.in/jcelliott/turnpike.v2 + go get -u gopkg.in/beatgammit/turnpike.v2 Stand-alone router: - go get -u gopkg.in/jcelliott/turnpike.v2/turnpike + go get -u gopkg.in/beatgammit/turnpike.v2/turnpike Client library usage --- @@ -56,7 +54,7 @@ import ( "log" "net/http" - "gopkg.in/jcelliott/turnpike.v2" + "gopkg.in/beatgammit/turnpike.v2" ) func main() { From ae44bc4bc4a440bb3ff1c3ccfc2b801104394ee2 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 4 Aug 2016 09:55:31 -0600 Subject: [PATCH 13/25] Fix compile errors in examples - no guarantee they still work, but this makes it easier to run tests --- examples/auth/client/client.go | 9 +++++++-- examples/chat/chatclient/main.go | 2 +- examples/meta-api/meta-client.go | 2 +- examples/rpc/rpc-client/main.go | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/examples/auth/client/client.go b/examples/auth/client/client.go index 2ae5944..e102c24 100644 --- a/examples/auth/client/client.go +++ b/examples/auth/client/client.go @@ -29,9 +29,14 @@ func main() { turnpike.Debug() fmt.Println("Hint: the password is 'password'") fmt.Print("Password: ") - password = gopass.GetPasswd() - c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/ws") + var err error + password, err = gopass.GetPasswd() + if err != nil { + log.Fatal("Error getting the password:", err) + } + + c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/ws", nil) if err != nil { log.Fatal(err) } diff --git a/examples/chat/chatclient/main.go b/examples/chat/chatclient/main.go index 760fdf2..fd219c4 100644 --- a/examples/chat/chatclient/main.go +++ b/examples/chat/chatclient/main.go @@ -36,7 +36,7 @@ func main() { username := os.Args[1] // turnpike.Debug() - c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/") + c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/", nil) if err != nil { log.Fatal(err) } diff --git a/examples/meta-api/meta-client.go b/examples/meta-api/meta-client.go index 0634d7c..b71e513 100644 --- a/examples/meta-api/meta-client.go +++ b/examples/meta-api/meta-client.go @@ -7,7 +7,7 @@ import ( ) func main() { - c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/") + c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/", nil) if err != nil { log.Fatal(err) } diff --git a/examples/rpc/rpc-client/main.go b/examples/rpc/rpc-client/main.go index 786c945..66ef4f7 100644 --- a/examples/rpc/rpc-client/main.go +++ b/examples/rpc/rpc-client/main.go @@ -12,7 +12,7 @@ import ( func main() { turnpike.Debug() - c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/") + c, err := turnpike.NewWebsocketClient(turnpike.JSON, "ws://localhost:8000/", nil) if err != nil { log.Fatal(err) } From 4b399d494b1a71ac0cbfa034a7c5f377c1b0763f Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 4 Aug 2016 10:24:47 -0600 Subject: [PATCH 14/25] Added locks to Client - single rw lock to make eliminate races, could definitely use some performance tuning/refactoring --- client.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/client.go b/client.go index 3120f32..3052184 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package turnpike import ( "crypto/tls" "fmt" + "sync" "time" ) @@ -38,6 +39,8 @@ type Client struct { events map[ID]*eventDesc procedures map[ID]*procedureDesc requestCount uint + + lock sync.RWMutex } type procedureDesc struct { @@ -212,11 +215,13 @@ func (c *Client) Receive() { switch msg := msg.(type) { case *Event: + c.lock.RLock() if event, ok := c.events[msg.Subscription]; ok { go event.handler(msg.Arguments, msg.ArgumentsKw) } else { log.Println("no handler registered for subscription:", msg.Subscription) } + c.lock.RUnlock() case *Invocation: c.handleInvocation(msg) @@ -251,6 +256,8 @@ func (c *Client) Receive() { func (c *Client) notifyListener(msg Message, requestID ID) { // pass in the request ID so we don't have to do any type assertion + c.lock.RLock() + defer c.lock.RUnlock() if l, ok := c.listeners[requestID]; ok { l <- msg } else { @@ -259,7 +266,9 @@ func (c *Client) notifyListener(msg Message, requestID ID) { } func (c *Client) handleInvocation(msg *Invocation) { + c.lock.RLock() if proc, ok := c.procedures[msg.Registration]; ok { + c.lock.RUnlock() go func() { result := proc.handler(msg.Arguments, msg.ArgumentsKw, msg.Details) @@ -287,6 +296,7 @@ func (c *Client) handleInvocation(msg *Invocation) { } }() } else { + c.lock.RUnlock() log.Println("no handler registered for registration:", msg.Registration) if err := c.Send(&Error{ Type: INVOCATION, @@ -302,12 +312,16 @@ func (c *Client) handleInvocation(msg *Invocation) { func (c *Client) registerListener(id ID) { log.Println("register listener:", id) wait := make(chan Message, 1) + c.lock.Lock() + defer c.lock.Unlock() c.listeners[id] = wait } func (c *Client) waitOnListener(id ID) (msg Message, err error) { log.Println("wait on listener:", id) + c.lock.RLock() wait, ok := c.listeners[id] + c.lock.RUnlock() if !ok { return nil, fmt.Errorf("unknown listener ID: %v", id) } @@ -345,6 +359,8 @@ func (c *Client) Subscribe(topic string, fn EventHandler) error { return fmt.Errorf(formatUnexpectedMessage(msg, SUBSCRIBED)) } else { // register the event handler with this subscription + c.lock.Lock() + defer c.lock.Unlock() c.events[subscribed.Subscription] = &eventDesc{topic, fn} } return nil @@ -356,12 +372,14 @@ func (c *Client) Unsubscribe(topic string) error { subscriptionID ID found bool ) + c.lock.RLock() for id, desc := range c.events { if desc.topic == topic { subscriptionID = id found = true } } + c.lock.RUnlock() if !found { return fmt.Errorf("Event %s is not registered with this client.", topic) } @@ -384,6 +402,9 @@ func (c *Client) Unsubscribe(topic string) error { } else if _, ok := msg.(*Unsubscribed); !ok { return fmt.Errorf(formatUnexpectedMessage(msg, UNSUBSCRIBED)) } + + c.lock.Lock() + defer c.lock.Unlock() delete(c.events, subscriptionID) return nil } @@ -416,6 +437,8 @@ func (c *Client) Register(procedure string, fn MethodHandler, options map[string return fmt.Errorf(formatUnexpectedMessage(msg, REGISTERED)) } else { // register the event handler with this registration + c.lock.Lock() + defer c.lock.Unlock() c.procedures[registered.Registration] = &procedureDesc{procedure, fn} } return nil @@ -439,12 +462,14 @@ func (c *Client) Unregister(procedure string) error { procedureID ID found bool ) + c.lock.RLock() for id, p := range c.procedures { if p.name == procedure { procedureID = id found = true } } + c.lock.RUnlock() if !found { return fmt.Errorf("Procedure %s is not registered with this client.", procedure) } @@ -468,6 +493,8 @@ func (c *Client) Unregister(procedure string) error { return fmt.Errorf(formatUnexpectedMessage(msg, UNREGISTERED)) } // register the event handler with this unregistration + c.lock.Lock() + defer c.lock.Unlock() delete(c.procedures, procedureID) return nil } From c1cf8e07d75d9fcae432ac910af39e70cb708bda Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 4 Aug 2016 10:45:14 -0600 Subject: [PATCH 15/25] Added more locks for unlikely, but still possible data races --- dealer.go | 4 +++- websocket_server.go | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/dealer.go b/dealer.go index 3758714..d6d6fea 100644 --- a/dealer.go +++ b/dealer.go @@ -210,8 +210,10 @@ func (d *defaultDealer) Error(sess *Session, msg *Error) { } func (d *defaultDealer) RemoveSession(sess *Session) { + d.Lock() + defer d.Unlock() + // TODO: this is low hanging fruit for optimization - // TODO: potential data race here, should lock the map for _, rproc := range d.procedures { if rproc.Endpoint == sess { delete(d.registrations, rproc.Registration) diff --git a/websocket_server.go b/websocket_server.go index c04414c..93a6ae5 100644 --- a/websocket_server.go +++ b/websocket_server.go @@ -3,6 +3,7 @@ package turnpike import ( "fmt" "net/http" + "sync" "github.com/gorilla/websocket" ) @@ -40,6 +41,8 @@ type WebsocketServer struct { TextSerializer Serializer // The serializer to use for binary frames. Defaults to JSONSerializer. BinarySerializer Serializer + + lock sync.RWMutex } // NewWebsocketServer creates a new WebsocketServer from a map of realms @@ -79,6 +82,8 @@ func (s *WebsocketServer) RegisterProtocol(proto string, payloadType int, serial if payloadType != websocket.TextMessage && payloadType != websocket.BinaryMessage { return invalidPayload(payloadType) } + s.lock.Lock() + defer s.lock.Unlock() if _, ok := s.protocols[proto]; ok { return protocolExists(proto) } @@ -114,10 +119,14 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) { var serializer Serializer var payloadType int + s.lock.RLock() if proto, ok := s.protocols[conn.Subprotocol()]; ok { + s.lock.RUnlock() serializer = proto.serializer payloadType = proto.payloadType } else { + s.lock.RUnlock() + // TODO: this will not currently ever be hit because // gorilla/websocket will reject the conncetion // if the subprotocol isn't registered From 71f77d3803ea2f587930fd6c10c80c0cc08813bf Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Fri, 19 Aug 2016 16:10:40 -0600 Subject: [PATCH 16/25] If the websocket is closed, don't close it again - it seems like this was blocking when the socket was already closed, which caused some stability issues over time as goroutines built up --- websocket.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/websocket.go b/websocket.go index 64a6f3f..06e44ef 100644 --- a/websocket.go +++ b/websocket.go @@ -86,8 +86,11 @@ func (ep *websocketPeer) run() { log.Println("peer connection closed") } else { log.Println("error reading from peer:", err) - ep.conn.Close() + if !websocket.IsCloseError(err) { + ep.conn.Close() + } } + log.Println("Closing channel") close(ep.messages) break } else if msgType == websocket.CloseMessage { From afcecc366d50a10094931daf0240fe674f049375 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Mon, 22 Aug 2016 16:14:52 -0600 Subject: [PATCH 17/25] Close session in realm --- realm.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/realm.go b/realm.go index 3f48b2f..d5241d6 100644 --- a/realm.go +++ b/realm.go @@ -44,7 +44,10 @@ func (r *Realm) getPeer(details map[string]interface{}) (Peer, error) { if details == nil { details = make(map[string]interface{}) } - go r.handleSession(sess) + go func() { + r.handleSession(sess) + sess.Close() + }() log.Println("Established internal session:", sess) return peerB, nil } From 00ecc285603cad92a814384e743bbd53ccb65257 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 25 Aug 2016 09:53:02 -0600 Subject: [PATCH 18/25] Send messages in a goroutine - I noticed a that defaultBroker.Subscribe could block, and the only way that could happen is if the Send function in either Unsubscribe (unlikely) or Publish (very likely) blocked - I think there's still a larger problem with transports blocking on the Send, but this should at least allow messages to go through if that happens --- broker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/broker.go b/broker.go index 62e92f9..c62b1e9 100644 --- a/broker.go +++ b/broker.go @@ -71,13 +71,13 @@ func (br *defaultBroker) Publish(sess *Session, msg *Publish) { event.Subscription = id // don't send event to publisher if sub != pub || !excludePublisher { - sub.Send(&event) + go sub.Send(&event) } } // only send published message if acknowledge is present and set to true if doPub, _ := msg.Options["acknowledge"].(bool); doPub { - pub.Send(&Published{Request: msg.Request, Publication: pubID}) + go pub.Send(&Published{Request: msg.Request, Publication: pubID}) } } @@ -94,7 +94,7 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) { br.subscriptions[id] = msg.Topic - sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id}) + go sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id}) } func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) { @@ -124,5 +124,5 @@ func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) { delete(br.routes, topic) } } - sess.Peer.Send(&Unsubscribed{Request: msg.Request}) + go sess.Peer.Send(&Unsubscribed{Request: msg.Request}) } From 4a8e170138a4d1c2ed9aa4c978f30737b1bb5abb Mon Sep 17 00:00:00 2001 From: Aaron Shumway <aclshumway@gmail.com> Date: Thu, 25 Aug 2016 16:34:37 -0600 Subject: [PATCH 19/25] Added sendMsgs channel to send websocket messages without blocking. Also added a write timeout. --- websocket.go | 170 ++++++++++++++++++++++++++++++++++++++++---- websocket_server.go | 25 +++++-- 2 files changed, 177 insertions(+), 18 deletions(-) diff --git a/websocket.go b/websocket.go index 06e44ef..0a31a34 100644 --- a/websocket.go +++ b/websocket.go @@ -2,6 +2,7 @@ package turnpike import ( "crypto/tls" + "errors" "fmt" "sync" "time" @@ -9,13 +10,22 @@ import ( "github.com/gorilla/websocket" ) +// errors. +var ( + ErrWSSendTimeout = errors.New("ws peer send timeout") + ErrWSIsClosed = errors.New("ws peer is closed") +) + type websocketPeer struct { conn *websocket.Conn serializer Serializer + sendMsgs chan Message messages chan Message payloadType int - closed bool mutex sync.Mutex + inSending chan struct{} + closing chan struct{} + *ConnectionConfig } // NewWebsocketPeer connects to the websocket server at the specified url. @@ -44,45 +54,113 @@ func newWebsocketPeer(url, protocol, origin string, serializer Serializer, paylo return nil, err } ep := &websocketPeer{ - conn: conn, - messages: make(chan Message, 100), - serializer: serializer, - payloadType: payloadType, + conn: conn, + sendMsgs: make(chan Message, 16), + messages: make(chan Message, 100), + serializer: serializer, + payloadType: payloadType, + closing: make(chan struct{}), + ConnectionConfig: &ConnectionConfig{}, } go ep.run() return ep, nil } -// TODO: make this just add the message to a channel so we don't block func (ep *websocketPeer) Send(msg Message) error { - b, err := ep.serializer.Serialize(msg) - if err != nil { - return err + select { + case ep.sendMsgs <- msg: + return nil + case <-time.After(5 * time.Second): + log.Println(ErrWSSendTimeout.Error()) + ep.Close() + return ErrWSSendTimeout + case <-ep.closing: + log.Println(ErrWSIsClosed.Error()) + return ErrWSIsClosed } - ep.mutex.Lock() - defer ep.mutex.Unlock() - return ep.conn.WriteMessage(ep.payloadType, b) } + func (ep *websocketPeer) Receive() <-chan Message { return ep.messages } + +func (ep *websocketPeer) doClosing() { + ep.mutex.Lock() + defer ep.mutex.Unlock() + + select { + case <-ep.closing: + default: + close(ep.closing) + } +} + +func (ep *websocketPeer) isClosed() bool { + select { + case <-ep.closing: + return true + default: + return false + } +} + func (ep *websocketPeer) Close() error { + if ep.isClosed() { + return nil + } + ep.doClosing() + + if ep.inSending != nil { + <-ep.inSending + } + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "goodbye") err := ep.conn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(5*time.Second)) if err != nil { log.Println("error sending close message:", err) } - ep.closed = true + return ep.conn.Close() } +func (ep *websocketPeer) updateReadDeadline() { + ep.mutex.Lock() + defer ep.mutex.Unlock() + if ep.IdleTimeout > 0 { + ep.conn.SetReadDeadline(time.Now().Add(ep.IdleTimeout)) + } +} + +func (ep *websocketPeer) setReadDead() { + ep.mutex.Lock() + defer ep.mutex.Unlock() + ep.conn.SetReadDeadline(time.Now()) +} + func (ep *websocketPeer) run() { + go ep.sending() + + if ep.MaxMsgSize > 0 { + ep.conn.SetReadLimit(ep.MaxMsgSize) + } + ep.conn.SetPongHandler(func(v string) error { + log.Println("pong:", v) + ep.updateReadDeadline() + return nil + }) + ep.conn.SetPingHandler(func(v string) error { + log.Println("ping:", v) + ep.updateReadDeadline() + return nil + }) + for { // TODO: use conn.NextMessage() and stream // TODO: do something different based on binary/text frames + ep.updateReadDeadline() if msgType, b, err := ep.conn.ReadMessage(); err != nil { - if ep.closed { + if ep.isClosed() { log.Println("peer connection closed") } else { log.Println("error reading from peer:", err) @@ -108,3 +186,67 @@ func (ep *websocketPeer) run() { } } } + +func (ep *websocketPeer) sending() { + ep.inSending = make(chan struct{}) + var ticker *time.Ticker + if ep.PingTimeout == 0 { + ticker = time.NewTicker(7 * 24 * time.Hour) + } else { + ticker = time.NewTicker(ep.PingTimeout) + } + + defer func() { + ep.setReadDead() + ticker.Stop() + close(ep.inSending) + }() + + for { + select { + case msg := <-ep.sendMsgs: + if closed, _ := ep.doSend(msg); closed { + return + } + case <-ticker.C: + wt := ep.WriteTimeout + if wt == 0 { + wt = 10 * time.Second + } + if err := ep.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(wt)); err != nil { + log.Println("error sending ping message:", err) + return + } + case <-ep.closing: + // sending remaining messages. + for { + select { + case msg := <-ep.sendMsgs: + if closed, _ := ep.doSend(msg); !closed { + continue + } + default: + } + break + } + return + } + ep.updateReadDeadline() + } +} + +func (ep *websocketPeer) doSend(msg Message) (closed bool, err error) { + b, err := ep.serializer.Serialize(msg) + if err != nil { + log.Printf("error serializing peer message: %s, %+v", err, msg) + return true, err + } + if ep.WriteTimeout > 0 { + ep.conn.SetWriteDeadline(time.Now().Add(ep.WriteTimeout)) + } + if err = ep.conn.WriteMessage(ep.payloadType, b); err != nil { + log.Println("error write message: ", err) + return true, err + } + return +} diff --git a/websocket_server.go b/websocket_server.go index 93a6ae5..246b7cc 100644 --- a/websocket_server.go +++ b/websocket_server.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "sync" + "time" "github.com/gorilla/websocket" ) @@ -30,6 +31,14 @@ type protocol struct { serializer Serializer } +// ConnectionConfig is the configs of a connection. +type ConnectionConfig struct { + MaxMsgSize int64 + WriteTimeout time.Duration + PingTimeout time.Duration + IdleTimeout time.Duration +} + // WebsocketServer handles websocket connections. type WebsocketServer struct { Router @@ -41,6 +50,7 @@ type WebsocketServer struct { TextSerializer Serializer // The serializer to use for binary frames. Defaults to JSONSerializer. BinarySerializer Serializer + ConnectionConfig lock sync.RWMutex } @@ -69,6 +79,10 @@ func newWebsocketServer(r Router) *WebsocketServer { s := &WebsocketServer{ Router: r, protocols: make(map[string]protocol), + ConnectionConfig: ConnectionConfig{ + // PingTimeout: 3 * time.Minute, + WriteTimeout: 10 * time.Second, + }, } s.Upgrader = &websocket.Upgrader{} s.RegisterProtocol(jsonWebsocketProtocol, websocket.TextMessage, new(JSONSerializer)) @@ -144,10 +158,13 @@ func (s *WebsocketServer) handleWebsocket(conn *websocket.Conn) { } peer := websocketPeer{ - conn: conn, - serializer: serializer, - messages: make(chan Message, 100), - payloadType: payloadType, + conn: conn, + serializer: serializer, + sendMsgs: make(chan Message, 16), + messages: make(chan Message, 100), + payloadType: payloadType, + closing: make(chan struct{}), + ConnectionConfig: &s.ConnectionConfig, } go peer.run() From 90945ea40fd9820e28d60949e20ab74bda70d157 Mon Sep 17 00:00:00 2001 From: Aaron Shumway <aclshumway@gmail.com> Date: Thu, 25 Aug 2016 16:51:41 -0600 Subject: [PATCH 20/25] remove session subscriptions when session ends --- broker.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++-------- realm.go | 1 + 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/broker.go b/broker.go index c62b1e9..1bc60c6 100644 --- a/broker.go +++ b/broker.go @@ -13,12 +13,15 @@ type Broker interface { Subscribe(*Session, *Subscribe) // Unsubscribes from messages on a URI. Unsubscribe(*Session, *Unsubscribe) + // Remove all of session's subscriptions. + RemoveSession(*Session) } // A super simple broker that matches URIs to Subscribers. type defaultBroker struct { routes map[URI]map[ID]Sender subscriptions map[ID]URI + subscribers map[*Session][]ID lastRequestId ID @@ -31,6 +34,7 @@ func NewDefaultBroker() Broker { return &defaultBroker{ routes: make(map[URI]map[ID]Sender), subscriptions: make(map[ID]URI), + subscribers: make(map[*Session][]ID), } } @@ -91,38 +95,78 @@ func (br *defaultBroker) Subscribe(sess *Session, msg *Subscribe) { } id := br.nextRequestId() br.routes[msg.Topic][id] = sess.Peer - br.subscriptions[id] = msg.Topic + // subscribers + ids, ok := br.subscribers[sess] + if !ok { + ids = []ID{} + } + ids = append(ids, id) + br.subscribers[sess] = ids + go sess.Peer.Send(&Subscribed{Request: msg.Request, Subscription: id}) } +func (br *defaultBroker) RemoveSession(sess *Session) { + log.Printf("broker remove peer %p", sess) + br.Lock() + defer br.Unlock() + + for _, id := range br.subscribers[sess] { + br.unsubscribe(sess, id) + } +} + func (br *defaultBroker) Unsubscribe(sess *Session, msg *Unsubscribe) { br.Lock() defer br.Unlock() - topic, ok := br.subscriptions[msg.Subscription] - if !ok { + if !br.unsubscribe(sess, msg.Subscription) { err := &Error{ Type: msg.MessageType(), Request: msg.Request, Error: ErrNoSuchSubscription, } - sess.Peer.Send(err) + go sess.Peer.Send(err) log.Printf("Error unsubscribing: no such subscription %v", msg.Subscription) return } - delete(br.subscriptions, msg.Subscription) + + go sess.Peer.Send(&Unsubscribed{Request: msg.Request}) +} + +func (br *defaultBroker) unsubscribe(sess *Session, id ID) bool { + log.Printf("broker unsubscribing: %p, %d", &sess, id) + topic, ok := br.subscriptions[id] + if !ok { + return false + } + delete(br.subscriptions, id) if r, ok := br.routes[topic]; !ok { log.Printf("Error unsubscribing: unable to find routes for %s topic", topic) - } else if _, ok := r[msg.Subscription]; !ok { - log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, msg.Subscription) + } else if _, ok := r[id]; !ok { + log.Printf("Error unsubscribing: %s route does not exist for %v subscription", topic, id) } else { - delete(r, msg.Subscription) + delete(r, id) if len(r) == 0 { delete(br.routes, topic) } } - go sess.Peer.Send(&Unsubscribed{Request: msg.Request}) + + // subscribers + ids := br.subscribers[sess][:0] + for _, id := range br.subscribers[sess] { + if id != id { + ids = append(ids, id) + } + } + if len(ids) == 0 { + delete(br.subscribers, sess) + } else { + br.subscribers[sess] = ids + } + + return true } diff --git a/realm.go b/realm.go index d5241d6..f6454ff 100644 --- a/realm.go +++ b/realm.go @@ -194,6 +194,7 @@ func (r *Realm) handleSession(sess *Session) { defer r.lock.RUnlock() r.clients.Remove(string(sess.Id)) + r.Broker.RemoveSession(sess) r.Dealer.RemoveSession(sess) r.localClient.onLeave(sess.Id) }() From 20397bde7616ddf19a865519713aca7ae14c4b61 Mon Sep 17 00:00:00 2001 From: Jameson Little <t.jameson.little@gmail.com> Date: Mon, 31 Oct 2016 10:53:37 -0600 Subject: [PATCH 21/25] Clean up client listeners (#2) - fixes memory leak --- client.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client.go b/client.go index 3052184..492176a 100644 --- a/client.go +++ b/client.go @@ -317,6 +317,11 @@ func (c *Client) registerListener(id ID) { c.listeners[id] = wait } +func (c *Client) unregisterListener(id ID) { + log.Println("unregister listener:", id) + delete(c.listeners, id) +} + func (c *Client) waitOnListener(id ID) (msg Message, err error) { log.Println("wait on listener:", id) c.lock.RLock() @@ -341,6 +346,9 @@ type EventHandler func(args []interface{}, kwargs map[string]interface{}) func (c *Client) Subscribe(topic string, fn EventHandler) error { id := NewID() c.registerListener(id) + // TODO: figure out where to clean this up + // defer c.unregisterListener(id) + sub := &Subscribe{ Request: id, Options: make(map[string]interface{}), @@ -386,6 +394,8 @@ func (c *Client) Unsubscribe(topic string) error { id := NewID() c.registerListener(id) + defer c.unregisterListener(id) + sub := &Unsubscribe{ Request: id, Subscription: subscriptionID, @@ -418,6 +428,9 @@ type MethodHandler func( func (c *Client) Register(procedure string, fn MethodHandler, options map[string]interface{}) error { id := NewID() c.registerListener(id) + // TODO: figure out where to clean this up + // defer c.unregisterListener(id) + register := &Register{ Request: id, Options: options, @@ -475,6 +488,8 @@ func (c *Client) Unregister(procedure string) error { } id := NewID() c.registerListener(id) + defer c.unregisterListener(id) + unregister := &Unregister{ Request: id, Registration: procedureID, @@ -523,6 +538,7 @@ func (rpc RPCError) Error() string { func (c *Client) Call(procedure string, args []interface{}, kwargs map[string]interface{}) (*Result, error) { id := NewID() c.registerListener(id) + defer c.unregisterListener(id) call := &Call{ Request: id, From 4a4b73895f34dbafb57072896d3f75542dbc684b Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Mon, 31 Oct 2016 10:55:13 -0600 Subject: [PATCH 22/25] Fix data race panic --- client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client.go b/client.go index 492176a..2cfbd75 100644 --- a/client.go +++ b/client.go @@ -318,6 +318,9 @@ func (c *Client) registerListener(id ID) { } func (c *Client) unregisterListener(id ID) { + c.lock.Lock() + defer c.lock.Unlock() + log.Println("unregister listener:", id) delete(c.listeners, id) } From a8e7cbce2ba84230eab0579be1874bdff8cd1235 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Mon, 16 Jan 2017 16:00:55 -0700 Subject: [PATCH 23/25] Recover from send on closed channel --- session.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/session.go b/session.go index 0912bce..38ffc7d 100644 --- a/session.go +++ b/session.go @@ -59,12 +59,21 @@ func (s *localPeer) Receive() <-chan Message { return s.incoming } -func (s *localPeer) Send(msg Message) error { +func (s *localPeer) Send(msg Message) (err error) { + defer func() { + // just in case Close is called before Send + if r := recover(); r != nil { + err = fmt.Errorf("Attempt to write after Close()") + } + }() s.outgoing <- msg - return nil + return } func (s *localPeer) Close() error { - close(s.outgoing) + if s.outgoing != nil { + close(s.outgoing) + s.outgoing = nil + } return nil } From aaa21ce8fe51ce110e43204dc642e765e34e7611 Mon Sep 17 00:00:00 2001 From: "T. Jameson Little" <t.jameson.little@gmail.com> Date: Thu, 8 Jun 2017 11:07:25 -0600 Subject: [PATCH 24/25] Increase receive timeout to 30 seconds This should probably be easier to set. --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index 2cfbd75..2fdf9f8 100644 --- a/client.go +++ b/client.go @@ -67,7 +67,7 @@ func NewWebsocketClient(serialization Serialization, url string, tlscfg *tls.Con func NewClient(p Peer) *Client { c := &Client{ Peer: p, - ReceiveTimeout: 10 * time.Second, + ReceiveTimeout: 30 * time.Second, listeners: make(map[ID]chan Message), events: make(map[ID]*eventDesc), procedures: make(map[ID]*procedureDesc), From 573f579df7ee6b7f2962625d35a624d11222f2ec Mon Sep 17 00:00:00 2001 From: Aaron Shumway <aclshumway@gmail.com> Date: Wed, 6 Sep 2017 09:21:25 -0600 Subject: [PATCH 25/25] Manually close connection on unexpected errors --- websocket.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/websocket.go b/websocket.go index 0a31a34..967949a 100644 --- a/websocket.go +++ b/websocket.go @@ -164,7 +164,8 @@ func (ep *websocketPeer) run() { log.Println("peer connection closed") } else { log.Println("error reading from peer:", err) - if !websocket.IsCloseError(err) { + // only expected errors seem to close the connection, so close on any unexpected errors + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { ep.conn.Close() } }