Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/rsearch #1

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added rsearch/bin/bin
Binary file not shown.
49 changes: 49 additions & 0 deletions rsearch/bin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"github.com/romana/search"
"flag"
"log"
"fmt"
// "net/http"
// "encoding/json"

// "io"
)


func main() {
var cfgFile = flag.String("c", "", "Kubernetes reverse search config file")
var server = flag.Bool("s", false, "Start a server")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this just a server, and a separate tool for a client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra package to maintain. Extra package to push.

var searchTag = flag.String("r", "", "Search resources by tag")
flag.Parse()

done := make(chan search.Done)

config, err := search.NewConfig(*cfgFile);
if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could go directly below the point where err was assigned.

fmt.Printf("Can not read config file %s, %s\n", *cfgFile, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things use fmt, others use log. Is there a reason for the difference?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only bin/main.go uses fmt. Package itself uses log.

But maybe i was a bit less careful with main.go since it's just using the library.

return
}

if *server {
fmt.Println("Starting server")
nsUrl := fmt.Sprintf("%s/%s", config.Api.Url, config.Api.NamespaceUrl)
nsEvents, err := search.NsWatch(done, nsUrl)
if err != nil {
log.Fatal("Namespace watcher failed to start", err)
}

events := search.Conductor(nsEvents, done, config)
req, resp := search.Process(events, done, config)
log.Println("All routines started")
search.Serve(config, req, resp)
} else if len(*searchTag) > 0 {
if config.Server.Debug {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.Server.Debug is controlling the debug level of a client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a server.

fmt.Println("Making request t the server")
}
r := search.SearchResource(config, search.SearchRequest{*searchTag})
fmt.Println(r)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if it were neither?


}
5 changes: 5 additions & 0 deletions rsearch/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing else has a build.sh, so this is a bit odd.
And because of Godeps, it should probably munge the GOPATH to include the Godeps/_workspace path before running go build.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh that's just a helper for myself - shouldn't be in the repo.

find . -type d | while read line; do
( echo Building in the $line ....; cd $line && go build .)
done
42 changes: 42 additions & 0 deletions rsearch/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package search

import (
"net/http"
"encoding/json"
"fmt"
"log"
"bytes"
)

func SearchResource(config Config, req SearchRequest) SearchResponse {
url := "http://localhost:" + config.Server.Port
data := []byte(`{ "tag" : "` + req.Tag + `"}`)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something's happening with data, but I have no idea what's going on. Some comments would be really useful.

if config.Server.Debug {
log.Println("Making request with", string(data))
}

request, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
request.Header.Set("Content-Type", "application/json")
client := &http.Client{}
response, err := client.Do(request)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might as well use http.DefaultClient if you're not doing any special setup of the client or transport.

if err != nil {
panic(err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not log.Fatal? This is dumping the error twice and a stack trace.

}
defer response.Body.Close()
decoder := json.NewDecoder(response.Body)

sr := SearchResponse{}

if config.Server.Debug {
log.Println("Trying to decode", response.Body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.Body won't be a stringified version of the response, but some arbitrary pointers and struct values like &{0xc8201040e0 {0 0} false <nil> 0xd4840 0xd47e0}

}
err = decoder.Decode(&sr)
if err != nil {
panic(err)
}

if config.Server.Debug {
fmt.Println("Got response form a server", sr)
}
return sr
}
35 changes: 35 additions & 0 deletions rsearch/conductor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Conductor is a goroutine that consumes NsEvents
// and maintains a proper number of Resource goroutines
package search

func manageResources(ns NsEvent, terminators map[string]chan Done, config Config, out chan Event) {
if ns.Type == "ADDED" {
done := make(chan Done)
terminators[ns.Object.Metadata.Uid] = done
ns.Produce(out, terminators[ns.Object.Metadata.Uid], config)
} else if ns.Type == "DELETED" {
close(terminators[ns.Object.Metadata.Uid])
delete(terminators, ns.Object.Metadata.Uid)
}
}

func Conductor(in <-chan NsEvent, done <-chan Done, config Config) <-chan Event {
var terminators map[string]chan Done
terminators = make(map[string]chan Done)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terminators := map[string]chan Done{}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with make ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't about avoiding make, but avoiding 'var' and repetition of the type.


ns := NsEvent{}
out := make(chan Event)

go func () {
for {
select {
case ns = <- in:
manageResources(ns, terminators, config, out)
case <-done:
return
}
}
}()

return out
}
41 changes: 41 additions & 0 deletions rsearch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package search

import (
"gopkg.in/gcfg.v1"
)

type Done struct {}

type Config struct {
Resource Resource
Server Server
Api Api
}

type Server struct {
Port string
Debug bool
}

type Api struct {
Url string
NamespaceUrl string
}

type Resource struct {
Name string
Type string
Selector string
Namespaced string
UrlPrefix string
UrlPostfix string
}

func NewConfig(configFile string) (Config, error) {
cfg := Config{}
if err := gcfg.ReadFileInto(&cfg, configFile); err != nil {
return cfg, err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the other comment. What state is cfg in at the point an error occurs? Unchanged or partially changed? Is it appropriate to return it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an issue where it wouldn't accept nil for cfg in return.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, you'd have to return a zero-ed value of Config, ie: return Config{}, err
That'd be marginally better than returning a potentially invalid value of cfg (or changing it to *Config so you can return nil)
The caller should be checking err anyway.

}

return cfg, nil
}
18 changes: 18 additions & 0 deletions rsearch/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
; comment
[api]
url=http://192.168.0.10:8080

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this just be watchURL as a single item? Does it actually vary between deployments?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kube api url does change, namespace endpoint might not change but we want to be able to define api url separately as it's going to be used in few places.

namespaceUrl=api/v1/namespaces/?watch=true

[resource]
;type=builtin # TBD
type=3rdParty
;namespaced=false # TBD

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments after or before each config item to explain what the config is for? For example, it's not clear what "namespaced" means.

namespaced=true
urlPrefix=apis/romana.io/demo/v1/namespaces
urlPostfix=networkpolicys/?watch=true
name=NetworkPolicy
selector=podSelector

[server]
port=9700
debug=false
79 changes: 79 additions & 0 deletions rsearch/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package search

import (
"fmt"
"log"
"time"
"net/http"
"encoding/json"
)

/*
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"default","selfLink":"/api/v1/namespaces/default","uid":"d10db271-dc03-11e5-9c86-0213e1312dc5","resourceVersion":"6","creationTimestamp":"2016-02-25T21:07:45Z"},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
*/

type NsEvent struct {
Type string `json:"Type"`
Object NsObject `json:"object"`
}

type NsObject struct {
Kind string `json:"kind"`
Spec Spec `json:"spec"`
ApiVersion string `json:"apiVersion"`
Metadata Metadata `json:"metadata"`
Status map[string]string `json:"status"`
}

func NsWatch(done <-chan Done, url string) (<-chan NsEvent, error) {
out := make(chan NsEvent)
resp, err := http.Get(url)
if err != nil {
return out, err
}
tick := time.Tick(1 * time.Second)
fmt.Println(resp.Body)
dec := json.NewDecoder(resp.Body)
var e NsEvent

go func() {
for {
select {
case <- tick:
dec.Decode(&e)
out <- e
case <- done:
return
}
}
}()

return out, nil
}

func (ns NsEvent) Produce(out chan Event, done <-chan Done, config Config) error {
url := fmt.Sprintf("%s/%s/%s/%s", config.Api.Url, config.Resource.UrlPrefix, ns.Object.Metadata.Name, config.Resource.UrlPostfix)
log.Println("Launching producer to listen on ", url)
tick := time.Tick(1 * time.Second)

resp, err := http.Get(url)
if err != nil {
return err
}

dec := json.NewDecoder(resp.Body)
var e Event
go func() {
for {
select {
case <- tick:
dec.Decode(&e)
out <- e
case <- done:
return
}
}
}()

return nil
}
90 changes: 90 additions & 0 deletions rsearch/namespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package search

import (
"net/http"
"gopkg.in/gcfg.v1"
"testing"
"math/rand"
"time"
"log"
"fmt"
)

const cfgStr = `
; comment
[api]
url=http://127.0.0.1
namespaceUrl=api/v1/namespaces

[resource]
type=3rdParty
namespaced=true
urlPrefix=apis/romana.io/demo/v1/namespaces
urlPostfix=networkpolicys
name=NetworkPolicy
selector=podSelector

[server]
port=9700
debug=false
`

const testNs = `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"default","selfLink":"/api/v1/namespaces/default","uid":"d10db271-dc03-11e5-9c86-0213e1312dc5","resourceVersion":"6","creationTimestamp":"2016-02-25T21:07:45Z"},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
`

const testPolicy = `{"type":"ADDED","object":{"apiVersion":"romana.io/demo/v1","kind":"NetworkPolicy","metadata":{"name":"pol1","namespace":"default","selfLink":"/apis/romana.io/demo/v1/namespaces/default/networkpolicys/pol1","uid":"d7036130-e119-11e5-aab8-0213e1312dc5","resourceVersion":"119875","creationTimestamp":"2016-03-03T08:28:00Z","labels":{"owner":"t1"}},"spec":{"allowIncoming":{"from":[{"pods":{"tier":"frontend"}}],"toPorts":[{"port":80,"protocol":"TCP"}]},"podSelector":{"tier":"backend"}}}}
`

func fakeNsHandler(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, testNs)
}

func fakePolicyHandler(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, testPolicy)
}

func fakeServer(config Config) {
http.HandleFunc("/" + config.Api.NamespaceUrl, fakeNsHandler)
policyUrl := "/" + config.Resource.UrlPrefix + "/default/" + config.Resource.UrlPostfix
http.HandleFunc(policyUrl, fakePolicyHandler)
log.Fatal(http.ListenAndServe(config.Api.Url[7:], nil))
}

// Testing ability of nswatch to watch ns events
func TestNsWatch(t *testing.T) {
config := Config{}
err := gcfg.ReadStringInto(&config, cfgStr)
if err != nil {
t.Errorf("Failed to parse gcfg data: %s", err)
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
port := 50000 + r.Intn(1000)
config.Api.Url = fmt.Sprintf("%s:%d", config.Api.Url, port)

go fakeServer(config)
time.Sleep(time.Duration(5 * time.Second))

done := make(chan Done)
url := config.Api.Url + "/" + config.Api.NamespaceUrl
nsEvents, err := NsWatch(done, url)
if err != nil {
t.Error("Namespace watcher failed to start", err)
}

ns := <- nsEvents
if ns.Object.Metadata.Name != "default" {
t.Error("Expecting namespace name = default, got =", ns.Object.Metadata.Name)
}

events := make(chan Event)
err = ns.Produce(events, done, config)
if err != nil {
t.Error("Namespace producer failed to start", err)
}

e := <- events
if e.Object.Metadata.Name != "pol1" {
t.Error("Expecting policy name = pol1, got =", e.Object.Metadata.Name)
}
}
Loading