-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dev/rsearch #1
base: master
Are you sure you want to change the base?
Dev/rsearch #1
Changes from 1 commit
827d305
e14b248
5c70454
c67a610
0fd256a
ef6b920
d8a6dd9
f6df624
874e9ee
e165063
370dea5
7a4d9c9
35a61a8
f4f5939
4499626
44019a4
0edcdcd
f7f3702
4d62a2e
b87a4cc
773d8d6
6e60337
00be796
dc11e27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package main | ||
|
||
import ( | ||
"github.com/romana/search" | ||
"flag" | ||
"log" | ||
"fmt" | ||
// "net/http" | ||
// "encoding/json" | ||
|
||
// "io" | ||
) | ||
|
||
|
||
func main() { | ||
var cfgFile = flag.String("c", "", "Kubernetes reverse search config file") | ||
var server = flag.Bool("s", false, "Start a server") | ||
var searchTag = flag.String("r", "", "Search resources by tag") | ||
flag.Parse() | ||
|
||
done := make(chan search.Done) | ||
|
||
config, err := search.NewConfig(*cfgFile); | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could go directly below the point where |
||
fmt.Printf("Can not read config file %s, %s\n", *cfgFile, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some things use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only But maybe i was a bit less careful with main.go since it's just using the library. |
||
return | ||
} | ||
|
||
if *server { | ||
fmt.Println("Starting server") | ||
nsUrl := fmt.Sprintf("%s/%s", config.Api.Url, config.Api.NamespaceUrl) | ||
nsEvents, err := search.NsWatch(done, nsUrl) | ||
if err != nil { | ||
log.Fatal("Namespace watcher failed to start", err) | ||
} | ||
|
||
events := search.Conductor(nsEvents, done, config) | ||
req, resp := search.Process(events, done, config) | ||
log.Println("All routines started") | ||
search.Serve(config, req, resp) | ||
} else if len(*searchTag) > 0 { | ||
if config.Server.Debug { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And a server. |
||
fmt.Println("Making request t the server") | ||
} | ||
r := search.SearchResource(config, search.SearchRequest{*searchTag}) | ||
fmt.Println(r) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And if it were neither? |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/bin/bash | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing else has a build.sh, so this is a bit odd. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohh that's just a helper for myself - shouldn't be in the repo. |
||
find . -type d | while read line; do | ||
( echo Building in the $line ....; cd $line && go build .) | ||
done |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package search | ||
|
||
import ( | ||
"net/http" | ||
"encoding/json" | ||
"fmt" | ||
"log" | ||
"bytes" | ||
) | ||
|
||
func SearchResource(config Config, req SearchRequest) SearchResponse { | ||
url := "http://localhost:" + config.Server.Port | ||
data := []byte(`{ "tag" : "` + req.Tag + `"}`) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something's happening with data, but I have no idea what's going on. Some comments would be really useful. |
||
if config.Server.Debug { | ||
log.Println("Making request with", string(data)) | ||
} | ||
|
||
request, err := http.NewRequest("POST", url, bytes.NewBuffer(data)) | ||
request.Header.Set("Content-Type", "application/json") | ||
client := &http.Client{} | ||
response, err := client.Do(request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might as well use |
||
if err != nil { | ||
panic(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not log.Fatal? This is dumping the error twice and a stack trace. |
||
} | ||
defer response.Body.Close() | ||
decoder := json.NewDecoder(response.Body) | ||
|
||
sr := SearchResponse{} | ||
|
||
if config.Server.Debug { | ||
log.Println("Trying to decode", response.Body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
err = decoder.Decode(&sr) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
if config.Server.Debug { | ||
fmt.Println("Got response form a server", sr) | ||
} | ||
return sr | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
// Conductor is a goroutine that consumes NsEvents | ||
// and maintains a proper number of Resource goroutines | ||
package search | ||
|
||
func manageResources(ns NsEvent, terminators map[string]chan Done, config Config, out chan Event) { | ||
if ns.Type == "ADDED" { | ||
done := make(chan Done) | ||
terminators[ns.Object.Metadata.Uid] = done | ||
ns.Produce(out, terminators[ns.Object.Metadata.Uid], config) | ||
} else if ns.Type == "DELETED" { | ||
close(terminators[ns.Object.Metadata.Uid]) | ||
delete(terminators, ns.Object.Metadata.Uid) | ||
} | ||
} | ||
|
||
func Conductor(in <-chan NsEvent, done <-chan Done, config Config) <-chan Event { | ||
var terminators map[string]chan Done | ||
terminators = make(map[string]chan Done) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's wrong with make ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't about avoiding make, but avoiding 'var' and repetition of the type. |
||
|
||
ns := NsEvent{} | ||
out := make(chan Event) | ||
|
||
go func () { | ||
for { | ||
select { | ||
case ns = <- in: | ||
manageResources(ns, terminators, config, out) | ||
case <-done: | ||
return | ||
} | ||
} | ||
}() | ||
|
||
return out | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package search | ||
|
||
import ( | ||
"gopkg.in/gcfg.v1" | ||
) | ||
|
||
type Done struct {} | ||
|
||
type Config struct { | ||
Resource Resource | ||
Server Server | ||
Api Api | ||
} | ||
|
||
type Server struct { | ||
Port string | ||
Debug bool | ||
} | ||
|
||
type Api struct { | ||
Url string | ||
NamespaceUrl string | ||
} | ||
|
||
type Resource struct { | ||
Name string | ||
Type string | ||
Selector string | ||
Namespaced string | ||
UrlPrefix string | ||
UrlPostfix string | ||
} | ||
|
||
func NewConfig(configFile string) (Config, error) { | ||
cfg := Config{} | ||
if err := gcfg.ReadFileInto(&cfg, configFile); err != nil { | ||
return cfg, err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related to the other comment. What state is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had an issue where it wouldn't accept There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, you'd have to return a zero-ed value of Config, ie: |
||
} | ||
|
||
return cfg, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
; comment | ||
[api] | ||
url=http://192.168.0.10:8080 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't this just be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kube api url does change, namespace endpoint might not change but we want to be able to define api url separately as it's going to be used in few places. |
||
namespaceUrl=api/v1/namespaces/?watch=true | ||
|
||
[resource] | ||
;type=builtin # TBD | ||
type=3rdParty | ||
;namespaced=false # TBD | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add some comments after or before each config item to explain what the config is for? For example, it's not clear what "namespaced" means. |
||
namespaced=true | ||
urlPrefix=apis/romana.io/demo/v1/namespaces | ||
urlPostfix=networkpolicys/?watch=true | ||
name=NetworkPolicy | ||
selector=podSelector | ||
|
||
[server] | ||
port=9700 | ||
debug=false |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package search | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"time" | ||
"net/http" | ||
"encoding/json" | ||
) | ||
|
||
/* | ||
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"default","selfLink":"/api/v1/namespaces/default","uid":"d10db271-dc03-11e5-9c86-0213e1312dc5","resourceVersion":"6","creationTimestamp":"2016-02-25T21:07:45Z"},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} | ||
*/ | ||
|
||
type NsEvent struct { | ||
Type string `json:"Type"` | ||
Object NsObject `json:"object"` | ||
} | ||
|
||
type NsObject struct { | ||
Kind string `json:"kind"` | ||
Spec Spec `json:"spec"` | ||
ApiVersion string `json:"apiVersion"` | ||
Metadata Metadata `json:"metadata"` | ||
Status map[string]string `json:"status"` | ||
} | ||
|
||
func NsWatch(done <-chan Done, url string) (<-chan NsEvent, error) { | ||
out := make(chan NsEvent) | ||
resp, err := http.Get(url) | ||
if err != nil { | ||
return out, err | ||
} | ||
tick := time.Tick(1 * time.Second) | ||
fmt.Println(resp.Body) | ||
dec := json.NewDecoder(resp.Body) | ||
var e NsEvent | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <- tick: | ||
dec.Decode(&e) | ||
out <- e | ||
case <- done: | ||
return | ||
} | ||
} | ||
}() | ||
|
||
return out, nil | ||
} | ||
|
||
func (ns NsEvent) Produce(out chan Event, done <-chan Done, config Config) error { | ||
url := fmt.Sprintf("%s/%s/%s/%s", config.Api.Url, config.Resource.UrlPrefix, ns.Object.Metadata.Name, config.Resource.UrlPostfix) | ||
log.Println("Launching producer to listen on ", url) | ||
tick := time.Tick(1 * time.Second) | ||
|
||
resp, err := http.Get(url) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dec := json.NewDecoder(resp.Body) | ||
var e Event | ||
go func() { | ||
for { | ||
select { | ||
case <- tick: | ||
dec.Decode(&e) | ||
out <- e | ||
case <- done: | ||
return | ||
} | ||
} | ||
}() | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package search | ||
|
||
import ( | ||
"net/http" | ||
"gopkg.in/gcfg.v1" | ||
"testing" | ||
"math/rand" | ||
"time" | ||
"log" | ||
"fmt" | ||
) | ||
|
||
const cfgStr = ` | ||
; comment | ||
[api] | ||
url=http://127.0.0.1 | ||
namespaceUrl=api/v1/namespaces | ||
|
||
[resource] | ||
type=3rdParty | ||
namespaced=true | ||
urlPrefix=apis/romana.io/demo/v1/namespaces | ||
urlPostfix=networkpolicys | ||
name=NetworkPolicy | ||
selector=podSelector | ||
|
||
[server] | ||
port=9700 | ||
debug=false | ||
` | ||
|
||
const testNs = `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"default","selfLink":"/api/v1/namespaces/default","uid":"d10db271-dc03-11e5-9c86-0213e1312dc5","resourceVersion":"6","creationTimestamp":"2016-02-25T21:07:45Z"},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} | ||
` | ||
|
||
const testPolicy = `{"type":"ADDED","object":{"apiVersion":"romana.io/demo/v1","kind":"NetworkPolicy","metadata":{"name":"pol1","namespace":"default","selfLink":"/apis/romana.io/demo/v1/namespaces/default/networkpolicys/pol1","uid":"d7036130-e119-11e5-aab8-0213e1312dc5","resourceVersion":"119875","creationTimestamp":"2016-03-03T08:28:00Z","labels":{"owner":"t1"}},"spec":{"allowIncoming":{"from":[{"pods":{"tier":"frontend"}}],"toPorts":[{"port":80,"protocol":"TCP"}]},"podSelector":{"tier":"backend"}}}} | ||
` | ||
|
||
func fakeNsHandler(w http.ResponseWriter, req *http.Request) { | ||
fmt.Fprint(w, testNs) | ||
} | ||
|
||
func fakePolicyHandler(w http.ResponseWriter, req *http.Request) { | ||
fmt.Fprint(w, testPolicy) | ||
} | ||
|
||
func fakeServer(config Config) { | ||
http.HandleFunc("/" + config.Api.NamespaceUrl, fakeNsHandler) | ||
policyUrl := "/" + config.Resource.UrlPrefix + "/default/" + config.Resource.UrlPostfix | ||
http.HandleFunc(policyUrl, fakePolicyHandler) | ||
log.Fatal(http.ListenAndServe(config.Api.Url[7:], nil)) | ||
} | ||
|
||
// Testing ability of nswatch to watch ns events | ||
func TestNsWatch(t *testing.T) { | ||
config := Config{} | ||
err := gcfg.ReadStringInto(&config, cfgStr) | ||
if err != nil { | ||
t.Errorf("Failed to parse gcfg data: %s", err) | ||
} | ||
|
||
r := rand.New(rand.NewSource(time.Now().UnixNano())) | ||
port := 50000 + r.Intn(1000) | ||
config.Api.Url = fmt.Sprintf("%s:%d", config.Api.Url, port) | ||
|
||
go fakeServer(config) | ||
time.Sleep(time.Duration(5 * time.Second)) | ||
|
||
done := make(chan Done) | ||
url := config.Api.Url + "/" + config.Api.NamespaceUrl | ||
nsEvents, err := NsWatch(done, url) | ||
if err != nil { | ||
t.Error("Namespace watcher failed to start", err) | ||
} | ||
|
||
ns := <- nsEvents | ||
if ns.Object.Metadata.Name != "default" { | ||
t.Error("Expecting namespace name = default, got =", ns.Object.Metadata.Name) | ||
} | ||
|
||
events := make(chan Event) | ||
err = ns.Produce(events, done, config) | ||
if err != nil { | ||
t.Error("Namespace producer failed to start", err) | ||
} | ||
|
||
e := <- events | ||
if e.Object.Metadata.Name != "pol1" { | ||
t.Error("Expecting policy name = pol1, got =", e.Object.Metadata.Name) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this just a server, and a separate tool for a client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra package to maintain. Extra package to push.