Skip to content

Commit

Permalink
Merge pull request #46 from ernestio/websockets
Browse files Browse the repository at this point in the history
implemented websockets using gorilla/websockets and r3labs/broadcast
  • Loading branch information
purehyperbole authored Jun 14, 2018
2 parents 5e60176 + d818b4f commit 9189766
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 97 deletions.
68 changes: 21 additions & 47 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
name = "github.com/r3labs/pattern"

[[constraint]]
name = "github.com/r3labs/sse"
version = "1.0.1"
name = "github.com/r3labs/broadcast"
branch = "master"

[[constraint]]
name = "github.com/smartystreets/goconvey"
Expand Down
57 changes: 35 additions & 22 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,64 @@
package main

import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/dgrijalva/jwt-go"
"github.com/gorilla/websocket"
)

func unauthorized(w http.ResponseWriter) {
// Session : stores authentication data
type Session struct {
Token string `json:"token"`
Stream *string `json:"stream"`
EventID *string `json:"event_id"`
Username string
Authenticated bool
}

func unauthorized(w http.ResponseWriter) error {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return errors.New("Unauthorized")
}

func extractToken(r *http.Request) (string, error) {
auth := r.Header.Get("Authorization")
l := len("Bearer")
if len(auth) > l+1 && auth[:l] == "Bearer" {
return auth[l+1:], nil
func authenticate(w http.ResponseWriter, c *websocket.Conn) (*Session, error) {
var s Session

mt, message, err := c.ReadMessage()
if err != nil {
return nil, badrequest(w)
}
return "", errors.New("Invalid Token")
}

func authMiddleware(w http.ResponseWriter, r *http.Request) {
// Check Auth, Until Proper Auth Service is implemented
authToken, err := extractToken(r)
err = json.Unmarshal(message, &s)
if err != nil {
unauthorized(w)
return
return nil, badrequest(w)
}

token, err := jwt.Parse(authToken, func(t *jwt.Token) (interface{}, error) {
token, err := jwt.Parse(s.Token, func(t *jwt.Token) (interface{}, error) {
if t.Method.Alg() != jwt.SigningMethodHS256.Alg() {
return nil, fmt.Errorf("unexpected jwt signing method=%v", t.Header["alg"])
}
return []byte(secret), nil
})

if err != nil {
unauthorized(w)
return
if err != nil || !token.Valid {
_ = c.WriteMessage(mt, []byte(`{"status": "unauthorized"}`))
return nil, unauthorized(w)
}

s.Authenticated = true
claims, ok := token.Claims.(jwt.MapClaims)
if ok {
s.Username = claims["username"].(string)
}

if token.Valid != true {
unauthorized(w)
return
err = c.WriteMessage(mt, []byte(`{"status": "ok"}`))
if err != nil {
return nil, internalerror(w)
}

// Pass to sse server
ss.HTTPHandler(w, r)
return &s, nil
}
18 changes: 10 additions & 8 deletions service.go → build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/nats-io/nats"
"github.com/r3labs/sse"
"github.com/r3labs/broadcast"
)

// Build : holds builds values
Expand Down Expand Up @@ -41,17 +41,19 @@ func processBuild(msg *nats.Msg) {

switch msg.Subject {
case "build.create", "build.delete", "build.import", "environment.sync":
log.Println("Creating stream: ", id)
ss.CreateStream(id)
ss.Publish(id, &sse.Event{Data: data})
if !bc.StreamExists(id) {
log.Println("Creating stream: ", id)
bc.CreateStream(id)
}
bc.Publish(id, data)
case "build.create.done", "build.create.error", "build.delete.done", "build.delete.error", "build.import.done", "build.import.error", "environment.sync.done", "environment.sync.error":
ss.Publish(id, &sse.Event{Data: data})
go func(ss *sse.Server) {
bc.Publish(id, data)
go func(bc *broadcast.Server) {
// Wait for any late connecting clients before closing stream
time.Sleep(1 * time.Second)
log.Println("Closing stream: ", id)
ss.RemoveStream(id)
}(ss)
bc.RemoveStream(id)
}(bc)
}
}

Expand Down
5 changes: 2 additions & 3 deletions component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"log"

"github.com/nats-io/nats"
"github.com/r3labs/sse"
)

// Component : holds component values
Expand Down Expand Up @@ -43,8 +42,8 @@ func processComponent(msg *nats.Msg) {
return
}

if ss.StreamExists(id) {
ss.Publish(id, &sse.Event{Data: data})
if bc.StreamExists(id) {
bc.Publish(id, data)
}
}

Expand Down
102 changes: 102 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package main

import (
//
"errors"
"net/http"

"github.com/gorilla/websocket"
"github.com/r3labs/broadcast"
)

var upgrader = websocket.Upgrader{}

func handler(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
upgradefail(w)
return
}

var authorized bool
var ch chan *broadcast.Event
var sub *broadcast.Subscriber

defer func() {
_ = c.Close()

if ch != nil && sub != nil {
sub.Disconnect(ch)
}
}()

for {
if !authorized {
areq, err := authenticate(w, c)
if err != nil {
return
}

sub, ch, err = register(w, areq)
if err != nil {
return
}

authorized = true
} else {
msg, ok := <-ch
if !ok {
return
}

err := c.WriteMessage(websocket.TextMessage, msg.Data)
if err != nil {
_ = internalerror(w)
return
}
}
}
}

func register(w http.ResponseWriter, s *Session) (*broadcast.Subscriber, chan *broadcast.Event, error) {
if s.Stream == nil {
return nil, nil, badstream(w)
}

if !bc.StreamExists(*s.Stream) && !bc.AutoStream {
return nil, nil, badstream(w)
} else if !bc.StreamExists(*s.Stream) && bc.AutoStream {
bc.CreateStream(*s.Stream)
}

sub := bc.GetSubscriber(s.Username)
if sub == nil {
sub = broadcast.NewSubscriber(s.Username)
bc.Register(*s.Stream, sub)
}

return sub, sub.Connect(), nil
}

func upgradefail(w http.ResponseWriter) {
http.Error(w, "Unable to upgrade to websocket connection", http.StatusBadRequest)
}

func badrequest(w http.ResponseWriter) error {
http.Error(w, "Could not process sent data", http.StatusBadRequest)
return errors.New("Could not process sent data")
}

func badstream(w http.ResponseWriter) error {
http.Error(w, "Please specify a valid stream", http.StatusBadRequest)
return errors.New("Please specify a valid stream")
}

func internalerror(w http.ResponseWriter) error {
http.Error(w, "Internal server error", http.StatusInternalServerError)
return errors.New("Internal server error")
}
Loading

0 comments on commit 9189766

Please sign in to comment.