Skip to content

Commit

Permalink
CLI flush action
Browse files Browse the repository at this point in the history
  • Loading branch information
localhots committed Sep 28, 2014
1 parent a305f17 commit deda490
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
13 changes: 13 additions & 0 deletions cli/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -89,6 +90,18 @@ func main() {
}
},
},
{
Name: "flush",
Usage: "Flush all messages from given queues",
Action: func(c *cli.Context) {
if msgs := bsq.Flush(c.Args()...); msgs != nil {
jsn, _ := json.Marshal(msgs)
fmt.Println(string(jsn))
} else {
fmt.Printf("Failed to flush queues %s\n", strings.Join(c.Args(), ", "))
}
},
},
{
Name: "status",
Usage: "Show server status",
Expand Down
15 changes: 8 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,33 @@ func (c *Client) Subscribe(queues ...string) *Message {
}
}

func (c *Client) Flush(queues ...string) (messages []*Message) {
func (c *Client) Flush(queues ...string) []*Message {
url := c.url(flushEndpoint, "?queues=", strings.Join(queues, ","))
_, body := c.get(url)

var tmp []map[string]string
if err := json.Unmarshal(body, &tmp); err != nil {
return
return nil
}

messages = []*Message{}
messages := []*Message{}
for _, msg := range tmp {
messages = append(messages, &Message{msg["queue"], []byte(msg["message"])})
}

return
return messages
}

func (c *Client) Status() (stat []*QueueInfo) {
func (c *Client) Status() []*QueueInfo {
url := c.url(statusEndpoit)
_, body := c.get(url)

tmp := make(map[string]map[string]int)
if err := json.Unmarshal(body, &tmp); err != nil {
return
return nil
}

stat := []*QueueInfo{}
for queue, info := range tmp {
qi := &QueueInfo{
Name: queue,
Expand All @@ -138,7 +139,7 @@ func (c *Client) Status() (stat []*QueueInfo) {
stat = append(stat, qi)
}

return
return stat
}

func (c *Client) Debug() *DebugInfo {
Expand Down

0 comments on commit deda490

Please sign in to comment.