From 0d705d0efd8901ebf865faabf12a80ff6b1ebb74 Mon Sep 17 00:00:00 2001 From: Mark Newman Date: Fri, 9 Jun 2017 15:40:42 +0100 Subject: [PATCH] Remove output formatting --- auth.go | 2 +- c_service.go | 60 +++++++++++++++---------------- component.go | 46 ++++++++++++++++++++++++ event.go | 35 ++++++++++++++++++ main.go | 95 +++++++++++++++++++------------------------------ nats.go | 51 +++++--------------------- notification.go | 2 +- service.go | 40 +++++++++++++++++++++ setup.go | 6 ++-- 9 files changed, 201 insertions(+), 136 deletions(-) create mode 100644 component.go create mode 100644 event.go create mode 100644 service.go diff --git a/auth.go b/auth.go index 48e05e5..c712b87 100644 --- a/auth.go +++ b/auth.go @@ -51,5 +51,5 @@ func authMiddleware(w http.ResponseWriter, r *http.Request) { } // Pass to sse server - s.HTTPHandler(w, r) + ss.HTTPHandler(w, r) } diff --git a/c_service.go b/c_service.go index 3b6d078..052e7e9 100644 --- a/c_service.go +++ b/c_service.go @@ -5,34 +5,34 @@ package main // Service : ... -type Service struct { -} +//type Service struct { +//} -// Handle : ... -func (n *Service) Handle(subject string, lines []Message) []Message { - switch subject { - case "service.create": - lines = append(lines, Message{Body: "Applying your definition", Level: "INFO"}) - case "service.delete": - lines = append(lines, Message{Body: "Starting environment deletion", Level: "INFO"}) - case "service.create.done": - lines = append(lines, Message{Body: "SUCCESS: rules successfully applied", Level: "SUCCESS"}) - lines = append(lines, Message{Body: "error", Level: "ERROR"}) - case "service.create.error": - lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"}) - lines = append(lines, Message{Body: "error", Level: "ERROR"}) - case "service.delete.done": - lines = append(lines, Message{Body: "SUCCESS: your environment has been successfully deleted", Level: "SUCCESS"}) - lines = append(lines, Message{Body: "success", Level: "SUCCESS"}) - case "service.delete.error": - lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your service deletion.", Level: "INFO"}) - lines = append(lines, Message{Body: "error", Level: "ERROR"}) - case "service.import.done": - lines = append(lines, Message{Body: "SUCCESS: service successfully imported", Level: "SUCCESS"}) - lines = append(lines, Message{Body: "error", Level: "ERROR"}) - case "service.import.error": - lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"}) - lines = append(lines, Message{Body: "error", Level: "ERROR"}) - } - return lines -} +// // Handle : ... +// func (n *Service) Handle(subject string, lines []Message) []Message { +// switch subject { +// case "service.create": +// lines = append(lines, Message{Body: "Applying your definition", Level: "INFO"}) +// case "service.delete": +// lines = append(lines, Message{Body: "Starting environment deletion", Level: "INFO"}) +// case "service.create.done": +// lines = append(lines, Message{Body: "SUCCESS: rules successfully applied", Level: "SUCCESS"}) +// lines = append(lines, Message{Body: "error", Level: "ERROR"}) +// case "service.create.error": +// lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"}) +// lines = append(lines, Message{Body: "error", Level: "ERROR"}) +// case "service.delete.done": +// lines = append(lines, Message{Body: "SUCCESS: your environment has been successfully deleted", Level: "SUCCESS"}) +// lines = append(lines, Message{Body: "success", Level: "SUCCESS"}) +// case "service.delete.error": +// lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your service deletion.", Level: "INFO"}) +// lines = append(lines, Message{Body: "error", Level: "ERROR"}) +// case "service.import.done": +// lines = append(lines, Message{Body: "SUCCESS: service successfully imported", Level: "SUCCESS"}) +// lines = append(lines, Message{Body: "error", Level: "ERROR"}) +// case "service.import.error": +// lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"}) +// lines = append(lines, Message{Body: "error", Level: "ERROR"}) +// } +// return lines +// } diff --git a/component.go b/component.go new file mode 100644 index 0000000..27e00c1 --- /dev/null +++ b/component.go @@ -0,0 +1,46 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package main + +import ( + "encoding/json" + "strings" + + "github.com/nats-io/nats" +) + +type Component struct { + ID string `json:"_component_id"` + Type string `json:"_component"` + State string `json:"_state"` + Action string `json:"_action"` + Provider string `json:"_provider"` + Name string `json:"name"` + Error string `json:"error,omitempty"` + Service string `json:"service,omitempty"` +} + +func componentHandler(msg *nats.Msg) { + var c Component + if err := json.Unmarshal(msg.Data, &c); err != nil { + panic(err) + } + + id := c.getID() + + data, err := json.Marshal(c) + if err != nil { + panic(err) + } + + publishEvent(id, data) +} + +func (c *Component) getID() string { + var pieces []string + pieces = strings.Split(c.Service, "-") + + return pieces[len(pieces)-1] +} diff --git a/event.go b/event.go new file mode 100644 index 0000000..b73a32d --- /dev/null +++ b/event.go @@ -0,0 +1,35 @@ +package main + +import ( + "log" + "time" + + "github.com/r3labs/sse" +) + +func publishEvent(id string, data []byte) { + + // id := svr.getID() + + // data, err := json.Marshal(svr) + // if err != nil { + // panic(err) + // } + + //fmt.Printf("mydata = %+v\n", string(data)) + + // Create a new stream + log.Println("Creating stream for", id) + ss.CreateStream(id) + + ss.Publish(id, data) + + time.Sleep(10 * time.Millisecond) + // Remove a new stream when the build completes + log.Println("Closing stream for", id) + go func(s *sse.Server) { + ss.RemoveStream(id) + }(ss) + + // publishMessage(notification.getServiceID(), &nm) +} diff --git a/main.go b/main.go index 1abe54c..8c922ac 100644 --- a/main.go +++ b/main.go @@ -13,8 +13,8 @@ import ( "github.com/r3labs/sse" ) -var n *nats.Conn -var s *sse.Server +var nc *nats.Conn +var ss *sse.Server var host string var port string var secret string @@ -22,74 +22,51 @@ var err error func main() { setup() - defer n.Close() + defer nc.Close() // Create new SSE server - s = sse.New() - s.AutoStream = true - s.EncodeBase64 = true - defer s.Close() + ss = sse.New() + ss.AutoStream = true + ss.EncodeBase64 = true + defer ss.Close() // Create new HTTP Server and add the route handler mux := http.NewServeMux() mux.HandleFunc("/events", authMiddleware) - // Start nats handler, subscribe to all events related with the monitor - _, err = n.Subscribe("monitor.user", natsHandler) - if err != nil { - log.Println(err) - return + // Subscribe to service events + serviceSubjects := []string{ + "service.create", + "service.delete", + "service.import", } - _, err = n.Subscribe("service.create", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.delete", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.create.done", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.create.error", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.delete.done", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.delete.error", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.import.done", natsHandler) - if err != nil { - log.Println(err) - return - } - _, err = n.Subscribe("service.import.error", natsHandler) - if err != nil { - log.Println(err) - return + + for _, s := range serviceSubjects { + _, err = nc.Subscribe(s, serviceHandler) + if err != nil { + log.Println(err) + return + } } - _, err = n.Subscribe("*.*.*", genericHandler) - if err != nil { - log.Println(err) - return + // Subscribe to component events + componentSubjects := []string{ + "*.create.*", + "*.create.*.*", + "*.update.*", + "*.update.*.*", + "*.delete.*", + "*.delete.*.*", + "*.find.*", + "*.find.*.*", } - _, err = n.Subscribe("*.*.*.*", genericHandler) - if err != nil { - log.Println(err) - return + + for _, s := range componentSubjects { + _, err = nc.Subscribe(s, componentHandler) + if err != nil { + log.Println(err) + return + } } // Start Listening diff --git a/nats.go b/nats.go index fc58b98..a1f7c82 100644 --- a/nats.go +++ b/nats.go @@ -4,45 +4,12 @@ package main -import ( - "github.com/nats-io/nats" - "github.com/r3labs/sse" - "log" - "time" -) - -func natsHandler(msg *nats.Msg) { - var notification Notification - if err := processNotification(¬ification, msg); err != nil { - return - } - - switch msg.Subject { - case "monitor.user": - // Publish messages to subscribers - for _, nm := range notification.Messages { - publishMessage(notification.getServiceID(), &nm) - } - case "service.create", "service.delete": - var handler Service - // Create a new stream - log.Println("Creating stream for", notification.getServiceID()) - s.CreateStream(notification.getServiceID()) - lines := handler.Handle(msg.Subject, notification.Messages) - for _, nm := range lines { - publishMessage(notification.getServiceID(), &nm) - } - case "service.create.done", "service.create.error", "service.delete.done", "service.delete.error", "service.import.done", "service.import.error": - var handler Service - lines := handler.Handle(msg.Subject, notification.Messages) - for _, nm := range lines { - publishMessage(notification.getServiceID(), &nm) - } - time.Sleep(10 * time.Millisecond) - // Remove a new stream when the build completes - log.Println("Closing stream for", notification.getServiceID()) - go func(s *sse.Server) { - s.RemoveStream(notification.getServiceID()) - }(s) - } -} +// func natsHandler(msg *nats.Msg) { +// if strings.HasPrefix(msg.Subject, "service.") { +// fmt.Println("hit service") +// processService(msg) +// } else { +// fmt.Println("hit component") +// processComponent(msg) +// } +// } diff --git a/notification.go b/notification.go index b544df0..1969b04 100644 --- a/notification.go +++ b/notification.go @@ -22,7 +22,7 @@ type Message struct { Level string `json:"level"` } -// Notification stores any user output sent from the FSM +// Notification stores any user output sent from the Scheduler type Notification struct { ID string `json:"id"` Service string `json:"service"` diff --git a/service.go b/service.go new file mode 100644 index 0000000..777a969 --- /dev/null +++ b/service.go @@ -0,0 +1,40 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package main + +import ( + "encoding/json" + "strings" + + "github.com/nats-io/nats" +) + +type Service struct { + ID string `json:"id"` + Changes []Component `json:"changes"` +} + +func serviceHandler(msg *nats.Msg) { + var s Service + if err := json.Unmarshal(msg.Data, &s); err != nil { + panic(err) + } + + id := s.getID() + + data, err := json.Marshal(s) + if err != nil { + panic(err) + } + + publishEvent(id, data) +} + +func (s *Service) getID() string { + var pieces []string + pieces = strings.Split(s.ID, "-") + + return pieces[len(pieces)-1] +} diff --git a/setup.go b/setup.go index d9bef62..6ee46f0 100644 --- a/setup.go +++ b/setup.go @@ -21,7 +21,7 @@ type monitorConfig struct { func setup() { var err error // Open Nats connection - n, err = nats.Connect(os.Getenv("NATS_URI")) + nc, err = nats.Connect(os.Getenv("NATS_URI")) if err != nil { log.Println("Could not connect to nats") return @@ -29,7 +29,7 @@ func setup() { secret = os.Getenv("JWT_SECRET") if secret == "" { - token, err := n.Request("config.get.jwt_token", []byte(""), 1*time.Second) + token, err := nc.Request("config.get.jwt_token", []byte(""), 1*time.Second) if err != nil { panic("Can't get jwt_config config") } @@ -38,7 +38,7 @@ func setup() { } cfg := monitorConfig{} - msg, err := n.Request("config.get.monitor", []byte(""), 1*time.Second) + msg, err := nc.Request("config.get.monitor", []byte(""), 1*time.Second) if err != nil { panic("Can't get monitor config") }