diff --git a/Gopkg.lock b/Gopkg.lock index e3b9d9b..07ab151 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -4,20 +4,26 @@ [[projects]] name = "github.com/dgrijalva/jwt-go" packages = ["."] - revision = "dbeaa9332f19a944acb5736b4456cfcc02140e29" - version = "v3.1.0" + revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" + version = "v3.2.0" [[projects]] branch = "master" name = "github.com/ernestio/ernest-config-client" packages = ["."] - revision = "d767474ae0ea944ec7ee6b35d2204d2439ee0fae" + revision = "3031a29ef3927b84e94180b64a88f7efed657195" [[projects]] branch = "master" name = "github.com/gopherjs/gopherjs" packages = ["js"] - revision = "178c176a91fe05e3e6c58fa5c989bad19e6cdcb3" + revision = "8dffc02ea1cb8398bb73f30424697c60fcf8d4c5" + +[[projects]] + name = "github.com/gorilla/websocket" + packages = ["."] + revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" + version = "v1.2.0" [[projects]] name = "github.com/jinzhu/gorm" @@ -25,14 +31,14 @@ ".", "dialects/postgres" ] - revision = "5174cc5c242a728b435ea2be8a2f7f998e15429b" - version = "v1.0" + revision = "6ed508ec6a4ecb3531899a69cbc746ccf65a4166" + version = "v1.9.1" [[projects]] branch = "master" name = "github.com/jinzhu/inflection" packages = ["."] - revision = "1c35d901db3da928c72a72d8458480cc9ade058f" + revision = "04140366298a54a039076d798123ffa108fff46c" [[projects]] name = "github.com/jtolds/gls" @@ -48,7 +54,7 @@ "hstore", "oid" ] - revision = "19c8e9ad00952ce0c64489b60e8df88bb16dd514" + revision = "90697d60dd844d5ef6ff15135d0203f65d2f53b8" [[projects]] name = "github.com/nats-io/go-nats" @@ -56,14 +62,14 @@ "encoders/builtin", "util" ] - revision = "d66cb54e6b7bdd93f0b28afc8450d84c780dfb68" - version = "v1.4.0" + revision = "062418ea1c2181f52dc0f954f6204370519a868b" + version = "v1.5.0" [[projects]] name = "github.com/nats-io/nats" packages = ["."] - revision = "d66cb54e6b7bdd93f0b28afc8450d84c780dfb68" - version = "v1.4.0" + revision = "062418ea1c2181f52dc0f954f6204370519a868b" + version = "v1.5.0" [[projects]] name = "github.com/nats-io/nuid" @@ -77,6 +83,12 @@ packages = ["."] revision = "0f4f76b1155209cbdd18da3693f44f4fc9f8275d" +[[projects]] + branch = "master" + name = "github.com/r3labs/broadcast" + packages = ["."] + revision = "9b65253dd957d2c57c9a233b8ead911878fc2778" + [[projects]] branch = "master" name = "github.com/r3labs/pattern" @@ -96,8 +108,8 @@ "internal/go-render/render", "internal/oglematchers" ] - revision = "0b37b35ec7434b77e77a4bb29b79677cced992ea" - version = "1.8.1" + revision = "7678a5452ebea5b7090a6b163f844c133f523da2" + version = "1.8.3" [[projects]] name = "github.com/smartystreets/goconvey" @@ -130,6 +142,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "44727d65aa783160624cf0ed633e507cc90694234ffba40073ec5c049430708c" + inputs-digest = "5ce2ae3e56b76ccbb21f9556092fd8c3e2c6a93e64b17affca12ad502ddd4846" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index e35bd2b..8d8136e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,8 +37,8 @@ name = "github.com/r3labs/pattern" [[constraint]] - name = "github.com/r3labs/sse" - version = "1.0.1" + name = "github.com/r3labs/broadcast" + branch = "master" [[constraint]] name = "github.com/smartystreets/goconvey" diff --git a/auth.go b/auth.go index c712b87..0226976 100644 --- a/auth.go +++ b/auth.go @@ -5,51 +5,64 @@ package main import ( + "encoding/json" "errors" "fmt" "net/http" "github.com/dgrijalva/jwt-go" + "github.com/gorilla/websocket" ) -func unauthorized(w http.ResponseWriter) { +// Session : stores authentication data +type Session struct { + Token string `json:"token"` + Stream *string `json:"stream"` + EventID *string `json:"event_id"` + Username string + Authenticated bool +} + +func unauthorized(w http.ResponseWriter) error { http.Error(w, "Unauthorized", http.StatusUnauthorized) + return errors.New("Unauthorized") } -func extractToken(r *http.Request) (string, error) { - auth := r.Header.Get("Authorization") - l := len("Bearer") - if len(auth) > l+1 && auth[:l] == "Bearer" { - return auth[l+1:], nil +func authenticate(w http.ResponseWriter, c *websocket.Conn) (*Session, error) { + var s Session + + mt, message, err := c.ReadMessage() + if err != nil { + return nil, badrequest(w) } - return "", errors.New("Invalid Token") -} -func authMiddleware(w http.ResponseWriter, r *http.Request) { - // Check Auth, Until Proper Auth Service is implemented - authToken, err := extractToken(r) + err = json.Unmarshal(message, &s) if err != nil { - unauthorized(w) - return + return nil, badrequest(w) } - token, err := jwt.Parse(authToken, func(t *jwt.Token) (interface{}, error) { + token, err := jwt.Parse(s.Token, func(t *jwt.Token) (interface{}, error) { if t.Method.Alg() != jwt.SigningMethodHS256.Alg() { return nil, fmt.Errorf("unexpected jwt signing method=%v", t.Header["alg"]) } return []byte(secret), nil }) - if err != nil { - unauthorized(w) - return + if err != nil || !token.Valid { + c.WriteMessage(mt, []byte(`{"status": "unauthorized"}`)) + return nil, unauthorized(w) + } + + s.Authenticated = true + claims, ok := token.Claims.(jwt.MapClaims) + if ok { + s.Username = claims["username"].(string) } - if token.Valid != true { - unauthorized(w) - return + err = c.WriteMessage(mt, []byte(`{"status": "ok"}`)) + if err != nil { + return nil, internalerror(w) } - // Pass to sse server - ss.HTTPHandler(w, r) + return &s, nil } diff --git a/service.go b/build.go similarity index 87% rename from service.go rename to build.go index 29d4a01..7de9759 100644 --- a/service.go +++ b/build.go @@ -10,7 +10,7 @@ import ( "time" "github.com/nats-io/nats" - "github.com/r3labs/sse" + "github.com/r3labs/broadcast" ) // Build : holds builds values @@ -42,16 +42,16 @@ func processBuild(msg *nats.Msg) { switch msg.Subject { case "build.create", "build.delete", "build.import", "environment.sync": log.Println("Creating stream: ", id) - ss.CreateStream(id) - ss.Publish(id, &sse.Event{Data: data}) + bc.CreateStream(id) + bc.Publish(id, data) case "build.create.done", "build.create.error", "build.delete.done", "build.delete.error", "build.import.done", "build.import.error", "environment.sync.done", "environment.sync.error": - ss.Publish(id, &sse.Event{Data: data}) - go func(ss *sse.Server) { + bc.Publish(id, data) + go func(bc *broadcast.Server) { // Wait for any late connecting clients before closing stream time.Sleep(1 * time.Second) log.Println("Closing stream: ", id) - ss.RemoveStream(id) - }(ss) + bc.RemoveStream(id) + }(bc) } } diff --git a/component.go b/component.go index 4d8ae75..6459fa8 100644 --- a/component.go +++ b/component.go @@ -9,7 +9,6 @@ import ( "log" "github.com/nats-io/nats" - "github.com/r3labs/sse" ) // Component : holds component values @@ -43,8 +42,8 @@ func processComponent(msg *nats.Msg) { return } - if ss.StreamExists(id) { - ss.Publish(id, &sse.Event{Data: data}) + if bc.StreamExists(id) { + bc.Publish(id, data) } } diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..56f8d56 --- /dev/null +++ b/handler.go @@ -0,0 +1,95 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package main + +import ( + // + "errors" + "net/http" + + "github.com/gorilla/websocket" + "github.com/r3labs/broadcast" +) + +var upgrader = websocket.Upgrader{} + +func handler(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + upgradefail(w) + return + } + defer c.Close() + + var authorized bool + var ch chan *broadcast.Event + var sub *broadcast.Subscriber + + defer func() { + if ch != nil && sub != nil { + sub.Disconnect(ch) + } + }() + + for { + if !authorized { + areq, err := authenticate(w, c) + if err != nil { + return + } + + sub, ch, err = register(w, areq) + if err != nil { + return + } + } else { + msg := <-ch + err := c.WriteMessage(websocket.TextMessage, msg.Data) + if err != nil { + internalerror(w) + return + } + } + } +} + +func register(w http.ResponseWriter, s *Session) (*broadcast.Subscriber, chan *broadcast.Event, error) { + if s.Stream == nil { + return nil, nil, badstream(w) + } + + if !bc.StreamExists(*s.Stream) && !bc.AutoStream { + return nil, nil, badstream(w) + } else if !bc.StreamExists(*s.Stream) && bc.AutoStream { + bc.CreateStream(*s.Stream) + } + + sub := bc.GetSubscriber(s.Username) + if sub == nil { + sub = broadcast.NewSubscriber(s.Username) + bc.Register(*s.Stream, sub) + } + + return sub, sub.Connect(), nil +} + +func upgradefail(w http.ResponseWriter) { + http.Error(w, "Unable to upgrade to websocket connection", http.StatusBadRequest) +} + +func badrequest(w http.ResponseWriter) error { + http.Error(w, "Could not process sent data", http.StatusBadRequest) + return errors.New("Could not process sent data") +} + +func badstream(w http.ResponseWriter) error { + http.Error(w, "Please specify a valid stream", http.StatusBadRequest) + return errors.New("Please specify a valid stream") +} + +func internalerror(w http.ResponseWriter) error { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return errors.New("Internal server error") +} diff --git a/main.go b/main.go index bdc2fc2..2ed1f4d 100644 --- a/main.go +++ b/main.go @@ -10,11 +10,11 @@ import ( "net/http" "github.com/nats-io/nats" - "github.com/r3labs/sse" + "github.com/r3labs/broadcast" ) var nc *nats.Conn -var ss *sse.Server +var bc *broadcast.Server var host string var port string var secret string @@ -25,14 +25,13 @@ func main() { defer nc.Close() // Create new SSE server - ss = sse.New() - ss.AutoStream = true - ss.EncodeBase64 = true - defer ss.Close() + bc = broadcast.New() + bc.AutoStream = true + defer bc.Close() // Create new HTTP Server and add the route handler mux := http.NewServeMux() - mux.HandleFunc("/events", authMiddleware) + mux.HandleFunc("/events", handler) // Subscribe to subjects _, err = nc.Subscribe(">", natsHandler) diff --git a/main_test.go b/main_test.go index 193bac9..63139c7 100644 --- a/main_test.go +++ b/main_test.go @@ -6,19 +6,15 @@ package main import ( "errors" - "net/http" - "net/http/httptest" - "testing" "time" - "github.com/nats-io/nats" - "github.com/r3labs/sse" - . "github.com/smartystreets/goconvey/convey" + "github.com/r3labs/broadcast" + //. "github.com/smartystreets/goconvey/convey" ) -func wait(ch chan *sse.Event, duration time.Duration) (*sse.Event, error) { +func wait(ch chan *broadcast.Event, duration time.Duration) (*broadcast.Event, error) { var err error - var msg *sse.Event + var msg *broadcast.Event select { case event := <-ch: @@ -29,6 +25,8 @@ func wait(ch chan *sse.Event, duration time.Duration) (*sse.Event, error) { return msg, err } +/* + func TestMain(t *testing.T) { Convey("Given a new server", t, func() { @@ -138,3 +136,5 @@ func TestMain(t *testing.T) { }) } + +*/