Skip to content

Commit

Permalink
Remove output formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
g3kk0 committed Jul 10, 2017
1 parent 480451d commit 0d705d0
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 136 deletions.
2 changes: 1 addition & 1 deletion auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ func authMiddleware(w http.ResponseWriter, r *http.Request) {
}

// Pass to sse server
s.HTTPHandler(w, r)
ss.HTTPHandler(w, r)
}
60 changes: 30 additions & 30 deletions c_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,34 @@
package main

// Service : ...
type Service struct {
}
//type Service struct {
//}

// Handle : ...
func (n *Service) Handle(subject string, lines []Message) []Message {
switch subject {
case "service.create":
lines = append(lines, Message{Body: "Applying your definition", Level: "INFO"})
case "service.delete":
lines = append(lines, Message{Body: "Starting environment deletion", Level: "INFO"})
case "service.create.done":
lines = append(lines, Message{Body: "SUCCESS: rules successfully applied", Level: "SUCCESS"})
lines = append(lines, Message{Body: "error", Level: "ERROR"})
case "service.create.error":
lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"})
lines = append(lines, Message{Body: "error", Level: "ERROR"})
case "service.delete.done":
lines = append(lines, Message{Body: "SUCCESS: your environment has been successfully deleted", Level: "SUCCESS"})
lines = append(lines, Message{Body: "success", Level: "SUCCESS"})
case "service.delete.error":
lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your service deletion.", Level: "INFO"})
lines = append(lines, Message{Body: "error", Level: "ERROR"})
case "service.import.done":
lines = append(lines, Message{Body: "SUCCESS: service successfully imported", Level: "SUCCESS"})
lines = append(lines, Message{Body: "error", Level: "ERROR"})
case "service.import.error":
lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"})
lines = append(lines, Message{Body: "error", Level: "ERROR"})
}
return lines
}
// // Handle : ...
// func (n *Service) Handle(subject string, lines []Message) []Message {
// switch subject {
// case "service.create":
// lines = append(lines, Message{Body: "Applying your definition", Level: "INFO"})
// case "service.delete":
// lines = append(lines, Message{Body: "Starting environment deletion", Level: "INFO"})
// case "service.create.done":
// lines = append(lines, Message{Body: "SUCCESS: rules successfully applied", Level: "SUCCESS"})
// lines = append(lines, Message{Body: "error", Level: "ERROR"})
// case "service.create.error":
// lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"})
// lines = append(lines, Message{Body: "error", Level: "ERROR"})
// case "service.delete.done":
// lines = append(lines, Message{Body: "SUCCESS: your environment has been successfully deleted", Level: "SUCCESS"})
// lines = append(lines, Message{Body: "success", Level: "SUCCESS"})
// case "service.delete.error":
// lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your service deletion.", Level: "INFO"})
// lines = append(lines, Message{Body: "error", Level: "ERROR"})
// case "service.import.done":
// lines = append(lines, Message{Body: "SUCCESS: service successfully imported", Level: "SUCCESS"})
// lines = append(lines, Message{Body: "error", Level: "ERROR"})
// case "service.import.error":
// lines = append(lines, Message{Body: "\nOops! Something went wrong. Please manually fix any errors shown above and re-apply your definition.", Level: "INFO"})
// lines = append(lines, Message{Body: "error", Level: "ERROR"})
// }
// return lines
// }
46 changes: 46 additions & 0 deletions component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package main

import (
"encoding/json"
"strings"

"github.com/nats-io/nats"
)

type Component struct {
ID string `json:"_component_id"`
Type string `json:"_component"`
State string `json:"_state"`
Action string `json:"_action"`
Provider string `json:"_provider"`
Name string `json:"name"`
Error string `json:"error,omitempty"`
Service string `json:"service,omitempty"`
}

func componentHandler(msg *nats.Msg) {
var c Component
if err := json.Unmarshal(msg.Data, &c); err != nil {
panic(err)
}

id := c.getID()

data, err := json.Marshal(c)
if err != nil {
panic(err)
}

publishEvent(id, data)
}

func (c *Component) getID() string {
var pieces []string
pieces = strings.Split(c.Service, "-")

return pieces[len(pieces)-1]
}
35 changes: 35 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"log"
"time"

"github.com/r3labs/sse"
)

func publishEvent(id string, data []byte) {

// id := svr.getID()

// data, err := json.Marshal(svr)
// if err != nil {
// panic(err)
// }

//fmt.Printf("mydata = %+v\n", string(data))

// Create a new stream
log.Println("Creating stream for", id)
ss.CreateStream(id)

ss.Publish(id, data)

time.Sleep(10 * time.Millisecond)
// Remove a new stream when the build completes
log.Println("Closing stream for", id)
go func(s *sse.Server) {
ss.RemoveStream(id)
}(ss)

// publishMessage(notification.getServiceID(), &nm)
}
95 changes: 36 additions & 59 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,83 +13,60 @@ import (
"github.com/r3labs/sse"
)

var n *nats.Conn
var s *sse.Server
var nc *nats.Conn
var ss *sse.Server
var host string
var port string
var secret string
var err error

func main() {
setup()
defer n.Close()
defer nc.Close()

// Create new SSE server
s = sse.New()
s.AutoStream = true
s.EncodeBase64 = true
defer s.Close()
ss = sse.New()
ss.AutoStream = true
ss.EncodeBase64 = true
defer ss.Close()

// Create new HTTP Server and add the route handler
mux := http.NewServeMux()
mux.HandleFunc("/events", authMiddleware)

// Start nats handler, subscribe to all events related with the monitor
_, err = n.Subscribe("monitor.user", natsHandler)
if err != nil {
log.Println(err)
return
// Subscribe to service events
serviceSubjects := []string{
"service.create",
"service.delete",
"service.import",
}
_, err = n.Subscribe("service.create", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.delete", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.create.done", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.create.error", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.delete.done", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.delete.error", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.import.done", natsHandler)
if err != nil {
log.Println(err)
return
}
_, err = n.Subscribe("service.import.error", natsHandler)
if err != nil {
log.Println(err)
return

for _, s := range serviceSubjects {
_, err = nc.Subscribe(s, serviceHandler)
if err != nil {
log.Println(err)
return
}
}

_, err = n.Subscribe("*.*.*", genericHandler)
if err != nil {
log.Println(err)
return
// Subscribe to component events
componentSubjects := []string{
"*.create.*",
"*.create.*.*",
"*.update.*",
"*.update.*.*",
"*.delete.*",
"*.delete.*.*",
"*.find.*",
"*.find.*.*",
}
_, err = n.Subscribe("*.*.*.*", genericHandler)
if err != nil {
log.Println(err)
return

for _, s := range componentSubjects {
_, err = nc.Subscribe(s, componentHandler)
if err != nil {
log.Println(err)
return
}
}

// Start Listening
Expand Down
51 changes: 9 additions & 42 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,12 @@

package main

import (
"github.com/nats-io/nats"
"github.com/r3labs/sse"
"log"
"time"
)

func natsHandler(msg *nats.Msg) {
var notification Notification
if err := processNotification(&notification, msg); err != nil {
return
}

switch msg.Subject {
case "monitor.user":
// Publish messages to subscribers
for _, nm := range notification.Messages {
publishMessage(notification.getServiceID(), &nm)
}
case "service.create", "service.delete":
var handler Service
// Create a new stream
log.Println("Creating stream for", notification.getServiceID())
s.CreateStream(notification.getServiceID())
lines := handler.Handle(msg.Subject, notification.Messages)
for _, nm := range lines {
publishMessage(notification.getServiceID(), &nm)
}
case "service.create.done", "service.create.error", "service.delete.done", "service.delete.error", "service.import.done", "service.import.error":
var handler Service
lines := handler.Handle(msg.Subject, notification.Messages)
for _, nm := range lines {
publishMessage(notification.getServiceID(), &nm)
}
time.Sleep(10 * time.Millisecond)
// Remove a new stream when the build completes
log.Println("Closing stream for", notification.getServiceID())
go func(s *sse.Server) {
s.RemoveStream(notification.getServiceID())
}(s)
}
}
// func natsHandler(msg *nats.Msg) {
// if strings.HasPrefix(msg.Subject, "service.") {
// fmt.Println("hit service")
// processService(msg)
// } else {
// fmt.Println("hit component")
// processComponent(msg)
// }
// }
2 changes: 1 addition & 1 deletion notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Message struct {
Level string `json:"level"`
}

// Notification stores any user output sent from the FSM
// Notification stores any user output sent from the Scheduler
type Notification struct {
ID string `json:"id"`
Service string `json:"service"`
Expand Down
40 changes: 40 additions & 0 deletions service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package main

import (
"encoding/json"
"strings"

"github.com/nats-io/nats"
)

type Service struct {
ID string `json:"id"`
Changes []Component `json:"changes"`
}

func serviceHandler(msg *nats.Msg) {
var s Service
if err := json.Unmarshal(msg.Data, &s); err != nil {
panic(err)
}

id := s.getID()

data, err := json.Marshal(s)
if err != nil {
panic(err)
}

publishEvent(id, data)
}

func (s *Service) getID() string {
var pieces []string
pieces = strings.Split(s.ID, "-")

return pieces[len(pieces)-1]
}
Loading

0 comments on commit 0d705d0

Please sign in to comment.