From 9303cb60fcb21b47af61b7bd98148882c1d8c4e3 Mon Sep 17 00:00:00 2001 From: Tom Bevan Date: Thu, 6 Jul 2017 18:33:12 +0100 Subject: [PATCH] fixing tests and sse library interface changes --- component.go | 17 +++++++++----- main_test.go | 63 ++++++++++++++++++++++++---------------------------- nats.go | 2 -- service.go | 10 +++++---- setup.go | 4 ++-- 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/component.go b/component.go index aa5db03..bc96275 100644 --- a/component.go +++ b/component.go @@ -10,8 +10,10 @@ import ( "strings" "github.com/nats-io/nats" + "github.com/r3labs/sse" ) +// Component : holds component values type Component struct { ID string `json:"_component_id"` Subject string `json:"_subject"` @@ -35,19 +37,24 @@ func processComponent(msg *nats.Msg) { } id := c.getID() - data, err := json.Marshal(c) if err != nil { log.Println(err) return } - ss.Publish(id, data) + if ss.StreamExists(id) { + ss.Publish(id, &sse.Event{Data: data}) + } } func (c *Component) getID() string { - var pieces []string - pieces = strings.Split(c.Service, "-") + if strings.Contains(c.Service, "-") { + var pieces []string + pieces = strings.Split(c.Service, "-") + + return pieces[len(pieces)-1] + } - return pieces[len(pieces)-1] + return c.Service } diff --git a/main_test.go b/main_test.go index 5bc0d53..beacfbb 100644 --- a/main_test.go +++ b/main_test.go @@ -33,43 +33,43 @@ func TestMain(t *testing.T) { Convey("Given a new server", t, func() { // New Server - s = sse.New() - defer s.Close() + ss = sse.New() + defer ss.Close() mux := http.NewServeMux() - mux.HandleFunc("/events", s.HTTPHandler) + mux.HandleFunc("/events", ss.HTTPHandler) hs := httptest.NewServer(mux) url := hs.URL + "/events" Convey("When listening for NATS messages", func() { - createEvents := []string{"service.create", "service.delete"} + createEvents := []string{"service.create", "service.delete", "service.import"} for _, event := range createEvents { Convey("On receiving "+event, func() { - msg := nats.Msg{Subject: event, Data: []byte(`{"service": "test"}`)} + msg := nats.Msg{Subject: event, Data: []byte(`{"id": "test"}`)} natsHandler(&msg) time.Sleep(time.Millisecond * 10) Convey("It should create a stream for the service", func() { - So(s.StreamExists("test"), ShouldBeTrue) + So(ss.StreamExists("test"), ShouldBeTrue) }) }) } - deleteEvents := []string{"service.create.done", "service.delete.done", "service.create.error", "service.delete.error"} + deleteEvents := []string{"service.create.done", "service.delete.done", "service.import.done", "service.create.error", "service.delete.error", "service.import.error"} for _, event := range deleteEvents { - s.CreateStream("test") + ss.CreateStream("test") time.Sleep(time.Millisecond * 10) Convey("On receiving "+event, func() { - msg := nats.Msg{Subject: event, Data: []byte(`{"service": "test"}`)} + msg := nats.Msg{Subject: event, Data: []byte(`{"id": "test"}`)} natsHandler(&msg) time.Sleep(time.Millisecond * 1500) Convey("It should remove the services stream", func() { - So(s.StreamExists("test"), ShouldBeFalse) + So(ss.StreamExists("test"), ShouldBeFalse) }) }) @@ -77,26 +77,26 @@ func TestMain(t *testing.T) { Convey("On receiving an unknown message", func() { // Clean server - s.RemoveStream("test") + ss.RemoveStream("test") time.Sleep(time.Millisecond * 10) - msg := nats.Msg{Subject: "test.event", Data: []byte(`{"service": "test"}`)} + msg := nats.Msg{Subject: "test.event", Data: []byte(`{"id": "test"}`)} natsHandler(&msg) Convey("It should not create a stream", func() { - So(s.StreamExists("test"), ShouldBeFalse) + So(ss.StreamExists("test"), ShouldBeFalse) }) }) - Convey("When receiving monitor.user", func() { - testEvent := `{"service": "test", "messages":[{"body": "test", "color": "blue"}]}` - msg := nats.Msg{Subject: "monitor.user", Data: []byte(testEvent)} + Convey("When receiving component event network.create.aws.done", func() { + testEvent := `{"service": "test", "name": "network"}` + msg := nats.Msg{Subject: "network.create.aws.done", Data: []byte(testEvent)} Convey("And a stream exists", func() { rcv := make(chan *sse.Event) cl := sse.NewClient(url) - s.CreateStream("test") + ss.CreateStream("test") time.Sleep(time.Millisecond * 10) go func() { @@ -107,14 +107,19 @@ func TestMain(t *testing.T) { Convey("It should publish a message to the stream", func() { natsHandler(&msg) - event, err := wait(rcv, time.Millisecond*100) + for { + event, err := wait(rcv, time.Millisecond*100) + So(err, ShouldBeNil) - So(err, ShouldBeNil) - So(string(event.Data), ShouldEqual, `{"body":"test","level":""}`) + if len(event.Data) > 0 { + So(string(event.Data), ShouldEqual, `{"_component_id":"","_subject":"network.create.aws.done","_component":"","_state":"","_action":"","_provider":"","name":"network","service":"test"}`) + break + } + } }) }) - s.RemoveStream("test") + ss.RemoveStream("test") Convey("And a stream doesn't exist", func() { rcv := make(chan *sse.Event) @@ -122,19 +127,9 @@ func TestMain(t *testing.T) { time.Sleep(time.Millisecond * 10) - go func() { - _ = cl.SubscribeChan("test", rcv) - }() - time.Sleep(time.Millisecond * 10) - - Convey("It should not publish a message to the stream", func() { - natsHandler(&msg) - event, err := wait(rcv, time.Millisecond*100) - - So(err, ShouldBeNil) - if err != nil { - So(string(event.Data), ShouldNotEqual, `{"body":"test","color":"blue"}`) - } + Convey("It should error when connecting to the stream", func() { + err := cl.SubscribeChan("test", rcv) + So(err, ShouldNotBeNil) }) }) }) diff --git a/nats.go b/nats.go index c3fa46f..1f590c7 100644 --- a/nats.go +++ b/nats.go @@ -34,7 +34,5 @@ func natsHandler(msg *nats.Msg) { processService(msg) case pattern.Match(msg.Subject, components...): processComponent(msg) - default: - return } } diff --git a/service.go b/service.go index 32363e2..336139b 100644 --- a/service.go +++ b/service.go @@ -14,6 +14,7 @@ import ( "github.com/r3labs/sse" ) +// Service : holds service values type Service struct { ID string `json:"id"` Name string `json:"name"` @@ -43,12 +44,13 @@ func processService(msg *nats.Msg) { case "service.create", "service.delete", "service.import": log.Println("Creating stream: ", id) ss.CreateStream(id) - ss.Publish(id, data) + ss.Publish(id, &sse.Event{Data: data}) case "service.create.done", "service.create.error", "service.delete.done", "service.delete.error", "service.import.done", "service.import.error": - ss.Publish(id, data) - time.Sleep(10 * time.Millisecond) - log.Println("Closing stream: ", id) + ss.Publish(id, &sse.Event{Data: data}) go func(ss *sse.Server) { + // Wait for any late connecting clients before closing stream + time.Sleep(1 * time.Second) + log.Println("Closing stream: ", id) ss.RemoveStream(id) }(ss) } diff --git a/setup.go b/setup.go index 6ee46f0..53ff013 100644 --- a/setup.go +++ b/setup.go @@ -29,8 +29,8 @@ func setup() { secret = os.Getenv("JWT_SECRET") if secret == "" { - token, err := nc.Request("config.get.jwt_token", []byte(""), 1*time.Second) - if err != nil { + token, aerr := nc.Request("config.get.jwt_token", []byte(""), 1*time.Second) + if aerr != nil { panic("Can't get jwt_config config") }