From 76799714b377bdb27bfd087ca30ca16ec0de3849 Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Wed, 30 Jan 2019 18:22:28 +0000 Subject: [PATCH 1/7] =?UTF-8?q?corrected=20bad=20stream=20writes=20and=20i?= =?UTF-8?q?ncreased=20logging=C2=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- auth.go | 3 ++- build.go | 3 +-- handler.go | 13 +++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/auth.go b/auth.go index 52e9ad5..9df1e6a 100644 --- a/auth.go +++ b/auth.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "github.com/dgrijalva/jwt-go" @@ -24,7 +25,7 @@ type Session struct { } func unauthorized(w http.ResponseWriter) error { - http.Error(w, "Unauthorized", http.StatusUnauthorized) + log.Println("Unauthorized") return errors.New("Unauthorized") } diff --git a/build.go b/build.go index 618e19b..6d558c1 100644 --- a/build.go +++ b/build.go @@ -50,9 +50,8 @@ func processBuild(msg *nats.Msg) { bc.Publish(id, data) go func(bc *broadcast.Server) { // Wait for any late connecting clients before closing stream - time.Sleep(1 * time.Second) + time.Sleep(broadcast.DefaultMaxInactivity) log.Println("Closing stream: ", id) - bc.RemoveStream(id) }(bc) } } diff --git a/handler.go b/handler.go index 10997b1..4e58817 100644 --- a/handler.go +++ b/handler.go @@ -7,6 +7,7 @@ package main import ( // "errors" + "log" "net/http" "github.com/gorilla/websocket" @@ -27,6 +28,7 @@ func handler(w http.ResponseWriter, r *http.Request) { } var authorized bool + var areq *Session var ch chan *broadcast.Event var sub *broadcast.Subscriber @@ -40,7 +42,7 @@ func handler(w http.ResponseWriter, r *http.Request) { for { if !authorized { - areq, err := authenticate(w, c) + areq, err = authenticate(w, c) if err != nil { return } @@ -57,8 +59,10 @@ func handler(w http.ResponseWriter, r *http.Request) { return } + log.Println("Sending Message to ", areq.Stream) err := c.WriteMessage(websocket.TextMessage, msg.Data) if err != nil { + log.Println("failed to write to connection") _ = internalerror(w) return } @@ -87,20 +91,21 @@ func register(w http.ResponseWriter, s *Session) (*broadcast.Subscriber, chan *b } func upgradefail(w http.ResponseWriter) { + log.Println("Unable to upgrade to websocket connection") http.Error(w, "Unable to upgrade to websocket connection", http.StatusBadRequest) } func badrequest(w http.ResponseWriter) error { - http.Error(w, "Could not process sent data", http.StatusBadRequest) + log.Println("Could not process sent data") return errors.New("Could not process sent data") } func badstream(w http.ResponseWriter) error { - http.Error(w, "Please specify a valid stream", http.StatusBadRequest) + log.Println("Please specify a valid stream") return errors.New("Please specify a valid stream") } func internalerror(w http.ResponseWriter) error { - http.Error(w, "Internal server error", http.StatusInternalServerError) + log.Println("Internal server error") return errors.New("Internal server error") } From d044eed5b3b61a34492519b48098c981e65a5b76 Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Wed, 30 Jan 2019 18:33:42 +0000 Subject: [PATCH 2/7] increased logging on bad input --- auth.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/auth.go b/auth.go index 9df1e6a..fde60fc 100644 --- a/auth.go +++ b/auth.go @@ -34,11 +34,13 @@ func authenticate(w http.ResponseWriter, c *websocket.Conn) (*Session, error) { mt, message, err := c.ReadMessage() if err != nil { + log.Println(string(message)) return nil, badrequest(w) } err = json.Unmarshal(message, &s) if err != nil { + log.Println(string(message)) return nil, badrequest(w) } From ea2c65f1926f04d64a26239f6a1273e73766f2d2 Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Tue, 5 Feb 2019 17:35:04 +0000 Subject: [PATCH 3/7] cleaned code and increased logging, set timeout on opened websockets that do not auth --- Gopkg.lock | 13 ++++++-- auth.go | 95 ++++++++++++++++++++++++++++++++++++++-------------- handler.go | 98 +++++++++++++++++++----------------------------------- 3 files changed, 115 insertions(+), 91 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 3655a43..c6c9c57 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -17,6 +17,14 @@ pruneopts = "" revision = "9588aab91bdf53d2fa430f9505b33457d3fcb3bc" +[[projects]] + digest = "1:a25a2c5ae694b01713fb6cd03c3b1ac1ccc1902b9f0a922680a88ec254f968e1" + name = "github.com/google/uuid" + packages = ["."] + pruneopts = "" + revision = "9b3b1e0f5f99ae461456d768e7d301a7acdaa2d8" + version = "v1.1.0" + [[projects]] digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6" name = "github.com/gorilla/websocket" @@ -86,11 +94,11 @@ [[projects]] branch = "master" - digest = "1:48c96cc3c5c5f1a21392c1ceea468aac11932789818e25d1a144724d6331857c" + digest = "1:8358619cbf7369ca45cc080d1fbb32a5ec1f3c9107670bf7084edfd1fca19f14" name = "github.com/r3labs/broadcast" packages = ["."] pruneopts = "" - revision = "3201c0671907e667adfd457d5e08be075f59dfcc" + revision = "4dae0cec43490a508826503da448ab0db38ad551" [[projects]] branch = "master" @@ -128,6 +136,7 @@ input-imports = [ "github.com/dgrijalva/jwt-go", "github.com/ernestio/ernest-config-client", + "github.com/google/uuid", "github.com/gorilla/websocket", "github.com/nats-io/go-nats", "github.com/r3labs/broadcast", diff --git a/auth.go b/auth.go index fde60fc..1f11909 100644 --- a/auth.go +++ b/auth.go @@ -9,10 +9,11 @@ import ( "errors" "fmt" "log" - "net/http" + "time" "github.com/dgrijalva/jwt-go" "github.com/gorilla/websocket" + "github.com/r3labs/broadcast" ) // Session : stores authentication data @@ -20,52 +21,96 @@ type Session struct { Token string `json:"token"` Stream *string `json:"stream"` EventID *string `json:"event_id"` - Username string - Authenticated bool + Username string `json:"-"` + authenticated bool + subscriber *broadcast.Subscriber + channel chan *broadcast.Event } -func unauthorized(w http.ResponseWriter) error { - log.Println("Unauthorized") +func unauthorized(c *websocket.Conn, err error) error { + if err != nil { + log.Println("Unauthorized:", err.Error()) + } else { + log.Println("Unauthorized") + } + _ = c.WriteMessage(websocket.CloseMessage, []byte(`{"status": "unauthorized"}`)) return errors.New("Unauthorized") } -func authenticate(w http.ResponseWriter, c *websocket.Conn) (*Session, error) { - var s Session +func getAuthMessage(c *websocket.Conn, s *Session) error { + // timeout after 2 seconds if no request is sent + c.SetReadDeadline(time.Now().Add(time.Second * 2)) - mt, message, err := c.ReadMessage() + _, message, err := c.ReadMessage() if err != nil { - log.Println(string(message)) - return nil, badrequest(w) + return err } - err = json.Unmarshal(message, &s) - if err != nil { - log.Println(string(message)) - return nil, badrequest(w) + return json.Unmarshal(message, &s) +} + +func register(stream *string, username, requestID string) (*broadcast.Subscriber, chan *broadcast.Event, error) { + if stream == nil { + return nil, nil, errors.New("no stream specified") } - token, err := jwt.Parse(s.Token, func(t *jwt.Token) (interface{}, error) { - if t.Method.Alg() != jwt.SigningMethodHS256.Alg() { - return nil, fmt.Errorf("unexpected jwt signing method=%v", t.Header["alg"]) - } - return []byte(secret), nil - }) + log.Printf("[%s] subscribing to stream: %s\n", requestID, *stream) + if !bc.StreamExists(*stream) && !bc.AutoStream { + return nil, nil, errors.New("stream does not exist") + } else if !bc.StreamExists(*stream) && bc.AutoStream { + bc.CreateStream(*stream) + } + + sub := bc.GetStreamSubscriber(*stream, username) + if sub == nil { + sub = broadcast.NewSubscriber(username) + bc.Register(*stream, sub) + } + + return sub, sub.Connect(), nil +} + +func authenticate(c *websocket.Conn, requestID string) (*Session, error) { + var s Session + + log.Printf("[%s] authenticating user\n", requestID) + + err := getAuthMessage(c, &s) + if err != nil { + return nil, err + } + + token, err := jwt.Parse(s.Token, jwtVerify) if err != nil || !token.Valid { - _ = c.WriteMessage(mt, []byte(`{"status": "unauthorized"}`)) - return nil, unauthorized(w) + return nil, errors.New("invalid token") } - s.Authenticated = true + s.authenticated = true + claims, ok := token.Claims.(jwt.MapClaims) if ok { s.Username = claims["username"].(string) } - err = c.WriteMessage(mt, []byte(`{"status": "ok"}`)) + log.Printf("[%s] user authenticated\n", requestID) + err = c.WriteMessage(websocket.TextMessage, []byte(`{"status": "ok"}`)) if err != nil { - return nil, internalerror(w) + return nil, err + } + + // register to stream + s.subscriber, s.channel, err = register(s.Stream, s.Username, requestID) + if err != nil { + return nil, err } return &s, nil } + +func jwtVerify(t *jwt.Token) (interface{}, error) { + if t.Method.Alg() != jwt.SigningMethodHS256.Alg() { + return nil, fmt.Errorf("unexpected jwt signing method=%v", t.Header["alg"]) + } + return []byte(secret), nil +} diff --git a/handler.go b/handler.go index 4e58817..2d1434e 100644 --- a/handler.go +++ b/handler.go @@ -6,12 +6,12 @@ package main import ( // - "errors" + "log" "net/http" + "github.com/google/uuid" "github.com/gorilla/websocket" - "github.com/r3labs/broadcast" ) var upgrader = websocket.Upgrader{ @@ -21,91 +21,61 @@ var upgrader = websocket.Upgrader{ } func handler(w http.ResponseWriter, r *http.Request) { + var session *Session + + reqid := uuid.New().String() + + log.Println("client connected:", reqid) + c, err := upgrader.Upgrade(w, r, nil) if err != nil { - upgradefail(w) + upgradefail(w, err) return } - var authorized bool - var areq *Session - var ch chan *broadcast.Event - var sub *broadcast.Subscriber - defer func() { + log.Println("client disconnected:", reqid) _ = c.Close() - if ch != nil && sub != nil { - sub.Disconnect(ch) + if session.channel != nil && session.subscriber != nil { + session.subscriber.Disconnect( + session.channel, + ) } }() for { - if !authorized { - areq, err = authenticate(w, c) - if err != nil { - return - } - - sub, ch, err = register(w, areq) + if !session.authenticated { + session, err = authenticate(c, reqid) if err != nil { - return - } - - authorized = true - } else { - msg, ok := <-ch - if !ok { - return - } - - log.Println("Sending Message to ", areq.Stream) - err := c.WriteMessage(websocket.TextMessage, msg.Data) - if err != nil { - log.Println("failed to write to connection") - _ = internalerror(w) + badrequest(c, reqid, err) return } } - } -} -func register(w http.ResponseWriter, s *Session) (*broadcast.Subscriber, chan *broadcast.Event, error) { - if s.Stream == nil { - return nil, nil, badstream(w) - } + msg, ok := <-session.channel + if !ok { + log.Printf("[%s] event channel closed: %s\n", reqid, *session.Stream) + return + } - if !bc.StreamExists(*s.Stream) && !bc.AutoStream { - return nil, nil, badstream(w) - } else if !bc.StreamExists(*s.Stream) && bc.AutoStream { - bc.CreateStream(*s.Stream) - } + log.Println("sending message to:", *session.Stream) - sub := bc.GetStreamSubscriber(*s.Stream, s.Username) - if sub == nil { - sub = broadcast.NewSubscriber(s.Username) - bc.Register(*s.Stream, sub) + err := c.WriteMessage(websocket.TextMessage, msg.Data) + if err != nil { + badrequest(c, reqid, err) + return + } } - - return sub, sub.Connect(), nil } -func upgradefail(w http.ResponseWriter) { - log.Println("Unable to upgrade to websocket connection") +func upgradefail(w http.ResponseWriter, err error) { + log.Println("Unable to upgrade to websocket connection:", err.Error()) http.Error(w, "Unable to upgrade to websocket connection", http.StatusBadRequest) } -func badrequest(w http.ResponseWriter) error { - log.Println("Could not process sent data") - return errors.New("Could not process sent data") -} - -func badstream(w http.ResponseWriter) error { - log.Println("Please specify a valid stream") - return errors.New("Please specify a valid stream") -} - -func internalerror(w http.ResponseWriter) error { - log.Println("Internal server error") - return errors.New("Internal server error") +func badrequest(c *websocket.Conn, reqid string, err error) { + log.Printf("[%s] bad request: %s\n", reqid, err.Error()) + _ = c.WriteMessage(websocket.CloseUnsupportedData, []byte(`{"error": "bad request"}`)) + c.Close() } From 2774f5a8cb531f3f89a9b8fec083d658050ecc7d Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Tue, 5 Feb 2019 17:48:13 +0000 Subject: [PATCH 4/7] additional fixes --- handler.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/handler.go b/handler.go index 2d1434e..2c153ae 100644 --- a/handler.go +++ b/handler.go @@ -21,11 +21,9 @@ var upgrader = websocket.Upgrader{ } func handler(w http.ResponseWriter, r *http.Request) { - var session *Session - reqid := uuid.New().String() - log.Println("client connected:", reqid) + log.Printf("[%s] client connected\n", reqid) c, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -34,18 +32,14 @@ func handler(w http.ResponseWriter, r *http.Request) { } defer func() { - log.Println("client disconnected:", reqid) + log.Printf("[%s] client disconnected\n", reqid) _ = c.Close() - - if session.channel != nil && session.subscriber != nil { - session.subscriber.Disconnect( - session.channel, - ) - } }() + var session *Session + for { - if !session.authenticated { + if session == nil { session, err = authenticate(c, reqid) if err != nil { badrequest(c, reqid, err) @@ -59,8 +53,7 @@ func handler(w http.ResponseWriter, r *http.Request) { return } - log.Println("sending message to:", *session.Stream) - + log.Printf("[%s] sending message to: %s\n", reqid, *session.Stream) err := c.WriteMessage(websocket.TextMessage, msg.Data) if err != nil { badrequest(c, reqid, err) From cf888e7cb75394862af0ff0cf1e066b7c328788d Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Tue, 5 Feb 2019 18:58:02 +0000 Subject: [PATCH 5/7] cleanup of subscriber --- handler.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/handler.go b/handler.go index 2c153ae..5bce4b8 100644 --- a/handler.go +++ b/handler.go @@ -21,6 +21,8 @@ var upgrader = websocket.Upgrader{ } func handler(w http.ResponseWriter, r *http.Request) { + var session *Session + reqid := uuid.New().String() log.Printf("[%s] client connected\n", reqid) @@ -34,9 +36,15 @@ func handler(w http.ResponseWriter, r *http.Request) { defer func() { log.Printf("[%s] client disconnected\n", reqid) _ = c.Close() - }() - var session *Session + if session == nil { + return + } + + if session.subscriber != nil { + session.subscriber.Disconnect(session.channel) + } + }() for { if session == nil { From dc5da7790f3527eef3b607c7361a84787f2f0c14 Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Tue, 5 Mar 2019 13:39:03 +0000 Subject: [PATCH 6/7] fixing timeouts --- auth.go | 2 +- build.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/auth.go b/auth.go index 1f11909..2a9d810 100644 --- a/auth.go +++ b/auth.go @@ -39,7 +39,7 @@ func unauthorized(c *websocket.Conn, err error) error { func getAuthMessage(c *websocket.Conn, s *Session) error { // timeout after 2 seconds if no request is sent - c.SetReadDeadline(time.Now().Add(time.Second * 2)) + c.SetReadDeadline(time.Now().Add(time.Second * 5)) _, message, err := c.ReadMessage() if err != nil { diff --git a/build.go b/build.go index 6d558c1..77a49fa 100644 --- a/build.go +++ b/build.go @@ -50,7 +50,7 @@ func processBuild(msg *nats.Msg) { bc.Publish(id, data) go func(bc *broadcast.Server) { // Wait for any late connecting clients before closing stream - time.Sleep(broadcast.DefaultMaxInactivity) + time.Sleep(time.Second * 10) log.Println("Closing stream: ", id) }(bc) } From eda8a622eebafe75566038ed10988c2241717959 Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Tue, 5 Mar 2019 13:40:33 +0000 Subject: [PATCH 7/7] fixing lint errors --- auth.go | 2 +- handler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/auth.go b/auth.go index 2a9d810..78bb2af 100644 --- a/auth.go +++ b/auth.go @@ -39,7 +39,7 @@ func unauthorized(c *websocket.Conn, err error) error { func getAuthMessage(c *websocket.Conn, s *Session) error { // timeout after 2 seconds if no request is sent - c.SetReadDeadline(time.Now().Add(time.Second * 5)) + _ = c.SetReadDeadline(time.Now().Add(time.Second * 5)) _, message, err := c.ReadMessage() if err != nil { diff --git a/handler.go b/handler.go index 5bce4b8..bf22717 100644 --- a/handler.go +++ b/handler.go @@ -78,5 +78,5 @@ func upgradefail(w http.ResponseWriter, err error) { func badrequest(c *websocket.Conn, reqid string, err error) { log.Printf("[%s] bad request: %s\n", reqid, err.Error()) _ = c.WriteMessage(websocket.CloseUnsupportedData, []byte(`{"error": "bad request"}`)) - c.Close() + _ = c.Close() }