Skip to content

Commit

Permalink
fixing tests and sse library interface changes
Browse files Browse the repository at this point in the history
  • Loading branch information
purehyperbole authored and g3kk0 committed Jul 10, 2017
1 parent d90ded2 commit 9303cb6
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 47 deletions.
17 changes: 12 additions & 5 deletions component.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"strings"

"github.com/nats-io/nats"
"github.com/r3labs/sse"
)

// Component : holds component values
type Component struct {
ID string `json:"_component_id"`
Subject string `json:"_subject"`
Expand All @@ -35,19 +37,24 @@ func processComponent(msg *nats.Msg) {
}

id := c.getID()

data, err := json.Marshal(c)
if err != nil {
log.Println(err)
return
}

ss.Publish(id, data)
if ss.StreamExists(id) {
ss.Publish(id, &sse.Event{Data: data})
}
}

func (c *Component) getID() string {
var pieces []string
pieces = strings.Split(c.Service, "-")
if strings.Contains(c.Service, "-") {
var pieces []string
pieces = strings.Split(c.Service, "-")

return pieces[len(pieces)-1]
}

return pieces[len(pieces)-1]
return c.Service
}
63 changes: 29 additions & 34 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,70 +33,70 @@ func TestMain(t *testing.T) {

Convey("Given a new server", t, func() {
// New Server
s = sse.New()
defer s.Close()
ss = sse.New()
defer ss.Close()

mux := http.NewServeMux()
mux.HandleFunc("/events", s.HTTPHandler)
mux.HandleFunc("/events", ss.HTTPHandler)
hs := httptest.NewServer(mux)
url := hs.URL + "/events"

Convey("When listening for NATS messages", func() {
createEvents := []string{"service.create", "service.delete"}
createEvents := []string{"service.create", "service.delete", "service.import"}
for _, event := range createEvents {
Convey("On receiving "+event, func() {
msg := nats.Msg{Subject: event, Data: []byte(`{"service": "test"}`)}
msg := nats.Msg{Subject: event, Data: []byte(`{"id": "test"}`)}
natsHandler(&msg)

time.Sleep(time.Millisecond * 10)

Convey("It should create a stream for the service", func() {
So(s.StreamExists("test"), ShouldBeTrue)
So(ss.StreamExists("test"), ShouldBeTrue)
})

})
}

deleteEvents := []string{"service.create.done", "service.delete.done", "service.create.error", "service.delete.error"}
deleteEvents := []string{"service.create.done", "service.delete.done", "service.import.done", "service.create.error", "service.delete.error", "service.import.error"}
for _, event := range deleteEvents {
s.CreateStream("test")
ss.CreateStream("test")
time.Sleep(time.Millisecond * 10)

Convey("On receiving "+event, func() {
msg := nats.Msg{Subject: event, Data: []byte(`{"service": "test"}`)}
msg := nats.Msg{Subject: event, Data: []byte(`{"id": "test"}`)}
natsHandler(&msg)

time.Sleep(time.Millisecond * 1500)

Convey("It should remove the services stream", func() {
So(s.StreamExists("test"), ShouldBeFalse)
So(ss.StreamExists("test"), ShouldBeFalse)
})

})
}

Convey("On receiving an unknown message", func() {
// Clean server
s.RemoveStream("test")
ss.RemoveStream("test")
time.Sleep(time.Millisecond * 10)

msg := nats.Msg{Subject: "test.event", Data: []byte(`{"service": "test"}`)}
msg := nats.Msg{Subject: "test.event", Data: []byte(`{"id": "test"}`)}
natsHandler(&msg)

Convey("It should not create a stream", func() {
So(s.StreamExists("test"), ShouldBeFalse)
So(ss.StreamExists("test"), ShouldBeFalse)
})
})

Convey("When receiving monitor.user", func() {
testEvent := `{"service": "test", "messages":[{"body": "test", "color": "blue"}]}`
msg := nats.Msg{Subject: "monitor.user", Data: []byte(testEvent)}
Convey("When receiving component event network.create.aws.done", func() {
testEvent := `{"service": "test", "name": "network"}`
msg := nats.Msg{Subject: "network.create.aws.done", Data: []byte(testEvent)}

Convey("And a stream exists", func() {
rcv := make(chan *sse.Event)
cl := sse.NewClient(url)

s.CreateStream("test")
ss.CreateStream("test")
time.Sleep(time.Millisecond * 10)

go func() {
Expand All @@ -107,34 +107,29 @@ func TestMain(t *testing.T) {
Convey("It should publish a message to the stream", func() {
natsHandler(&msg)

event, err := wait(rcv, time.Millisecond*100)
for {
event, err := wait(rcv, time.Millisecond*100)
So(err, ShouldBeNil)

So(err, ShouldBeNil)
So(string(event.Data), ShouldEqual, `{"body":"test","level":""}`)
if len(event.Data) > 0 {
So(string(event.Data), ShouldEqual, `{"_component_id":"","_subject":"network.create.aws.done","_component":"","_state":"","_action":"","_provider":"","name":"network","service":"test"}`)
break
}
}
})
})

s.RemoveStream("test")
ss.RemoveStream("test")

Convey("And a stream doesn't exist", func() {
rcv := make(chan *sse.Event)
cl := sse.NewClient(url)

time.Sleep(time.Millisecond * 10)

go func() {
_ = cl.SubscribeChan("test", rcv)
}()
time.Sleep(time.Millisecond * 10)

Convey("It should not publish a message to the stream", func() {
natsHandler(&msg)
event, err := wait(rcv, time.Millisecond*100)

So(err, ShouldBeNil)
if err != nil {
So(string(event.Data), ShouldNotEqual, `{"body":"test","color":"blue"}`)
}
Convey("It should error when connecting to the stream", func() {
err := cl.SubscribeChan("test", rcv)
So(err, ShouldNotBeNil)
})
})
})
Expand Down
2 changes: 0 additions & 2 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,5 @@ func natsHandler(msg *nats.Msg) {
processService(msg)
case pattern.Match(msg.Subject, components...):
processComponent(msg)
default:
return
}
}
10 changes: 6 additions & 4 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/r3labs/sse"
)

// Service : holds service values
type Service struct {
ID string `json:"id"`
Name string `json:"name"`
Expand Down Expand Up @@ -43,12 +44,13 @@ func processService(msg *nats.Msg) {
case "service.create", "service.delete", "service.import":
log.Println("Creating stream: ", id)
ss.CreateStream(id)
ss.Publish(id, data)
ss.Publish(id, &sse.Event{Data: data})
case "service.create.done", "service.create.error", "service.delete.done", "service.delete.error", "service.import.done", "service.import.error":
ss.Publish(id, data)
time.Sleep(10 * time.Millisecond)
log.Println("Closing stream: ", id)
ss.Publish(id, &sse.Event{Data: data})
go func(ss *sse.Server) {
// Wait for any late connecting clients before closing stream
time.Sleep(1 * time.Second)
log.Println("Closing stream: ", id)
ss.RemoveStream(id)
}(ss)
}
Expand Down
4 changes: 2 additions & 2 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func setup() {

secret = os.Getenv("JWT_SECRET")
if secret == "" {
token, err := nc.Request("config.get.jwt_token", []byte(""), 1*time.Second)
if err != nil {
token, aerr := nc.Request("config.get.jwt_token", []byte(""), 1*time.Second)
if aerr != nil {
panic("Can't get jwt_config config")
}

Expand Down

0 comments on commit 9303cb6

Please sign in to comment.