diff --git a/rsearch/.gitignore b/rsearch/.gitignore new file mode 100644 index 0000000000..48b8bf9072 --- /dev/null +++ b/rsearch/.gitignore @@ -0,0 +1 @@ +vendor/ diff --git a/rsearch/Godeps/Godeps.json b/rsearch/Godeps/Godeps.json new file mode 100644 index 0000000000..e02015369b --- /dev/null +++ b/rsearch/Godeps/Godeps.json @@ -0,0 +1,23 @@ +{ + "ImportPath": "github.com/romana/contrib/rsearch", + "GoVersion": "go1.5", + "GodepVersion": "v58", + "Deps": [ + { + "ImportPath": "gopkg.in/gcfg.v1", + "Rev": "083575c3955c85df16fe9590cceab64d03f5eb6e" + }, + { + "ImportPath": "gopkg.in/gcfg.v1/scanner", + "Rev": "083575c3955c85df16fe9590cceab64d03f5eb6e" + }, + { + "ImportPath": "gopkg.in/gcfg.v1/token", + "Rev": "083575c3955c85df16fe9590cceab64d03f5eb6e" + }, + { + "ImportPath": "gopkg.in/gcfg.v1/types", + "Rev": "083575c3955c85df16fe9590cceab64d03f5eb6e" + } + ] +} diff --git a/rsearch/Godeps/Readme b/rsearch/Godeps/Readme new file mode 100644 index 0000000000..4cdaa53d56 --- /dev/null +++ b/rsearch/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/rsearch/TODO b/rsearch/TODO new file mode 100644 index 0000000000..690b7859f2 --- /dev/null +++ b/rsearch/TODO @@ -0,0 +1,3 @@ +1. When starting server read through namespace list and fire up ns.Producer per namespace. + Read throug resource list per namespace and send them to Processor +2. Import Kubernetes and use Objects from there diff --git a/rsearch/bin/.gitignore b/rsearch/bin/.gitignore new file mode 100644 index 0000000000..ba077a4031 --- /dev/null +++ b/rsearch/bin/.gitignore @@ -0,0 +1 @@ +bin diff --git a/rsearch/bin/main.go b/rsearch/bin/main.go new file mode 100644 index 0000000000..f9ff2ba2e2 --- /dev/null +++ b/rsearch/bin/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + + search "github.com/romana/contrib/rsearch" +) + +func main() { + var cfgFile = flag.String("c", "", "Kubernetes reverse search config file") + var server = flag.Bool("server-mode", false, "Start a server") + var host = flag.String("host", "", "Host for client to connect to") + var proto = flag.String("protocol", "", "Protocol to use for client connect to") + var port = flag.String("port", "", "TCP port for client to connect to") + var searchTag = flag.String("r", "", "Search resources by tag") + flag.Parse() + + done := make(chan search.Done) + + config, err := search.NewConfig(*cfgFile) + if err != nil { + log.Fatalf("Can not read config file %s, %s\n", *cfgFile, err) + } + + if *host != "" { + config.Server.Host = *host + } + + if *proto != "" { + config.Server.Proto = *proto + } + + if *port != "" { + config.Server.Port = *port + } + + if *server { + log.Println("Starting server") + nsUrl := fmt.Sprintf("%s/%s", config.Api.Url, config.Api.NamespaceUrl) + nsEvents, err := search.NsWatch(done, nsUrl, config) + if err != nil { + log.Fatal("Namespace watcher failed to start", err) + } + + events := search.Conductor(nsEvents, done, config) + req := search.Process(events, done, config) + log.Println("All routines started") + search.Serve(config, req) + } else if len(*searchTag) > 0 { + if config.Server.Debug { + log.Println("Making request to the server") + } + r := search.SearchResource(config, search.SearchRequest{Tag: *searchTag}) + response, err := json.Marshal(r) + if err != nil { + log.Fatal("Failed to parse out server response, ", err) + } + fmt.Println(string(response)) + } else { + log.Fatal("Either -s or -r must be given") + } +} diff --git a/rsearch/build.sh b/rsearch/build.sh new file mode 100755 index 0000000000..46b802ea94 --- /dev/null +++ b/rsearch/build.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +find . -type d | while read line; do + ( echo Building in the $line ....; cd $line && go build .) +done diff --git a/rsearch/client.go b/rsearch/client.go new file mode 100644 index 0000000000..3b08e893a2 --- /dev/null +++ b/rsearch/client.go @@ -0,0 +1,65 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" +) + +// SearchResource makes HTTP request to the server +// instance if this library. +func SearchResource(config Config, req SearchRequest) SearchResponse { + // TODO need to make url configurable + url := fmt.Sprintf("%s://%s:%s", config.Server.Proto, config.Server.Host, config.Server.Port) + data := []byte(`{ "tag" : "` + req.Tag + `"}`) + if config.Server.Debug { + log.Printf("Making request with %s to the %s\n", string(data), url) + } + + // Making request. + request, err := http.NewRequest("POST", url, bytes.NewBuffer(data)) + if err != nil { + log.Fatal("Failed to connect to server", url) + } + + request.Header.Set("Content-Type", "application/json") + client := &http.Client{} + response, err := client.Do(request) + if err != nil { + log.Fatal("HTTP request failed", url, err) + } + + defer response.Body.Close() + decoder := json.NewDecoder(response.Body) + sr := SearchResponse{} + if config.Server.Debug { + log.Println("Trying to decode", response.Body) + } + err = decoder.Decode(&sr) + if err != nil { + log.Println("Failed to decode", response.Body) + panic(err) + } + + if config.Server.Debug { + fmt.Println("Decoded response form a server", sr) + } + return sr +} diff --git a/rsearch/conductor.go b/rsearch/conductor.go new file mode 100644 index 0000000000..0b3c7f46a2 --- /dev/null +++ b/rsearch/conductor.go @@ -0,0 +1,76 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "log" +) + +// manageResources manages map of termination channels and fires up new +// per-namespace gorotines when needed. +func manageResources(ns Event, terminators map[string]chan Done, config Config, out chan Event) { + uid := ns.Object.Metadata.Uid + if ns.Type == KubeEventAdded { + if _, ok := terminators[uid]; ok { + log.Println("Received ADDED event for uid that is already known, ignoring ", uid) + return + } + + done := make(chan Done) + terminators[uid] = done + ns.Object.Produce(out, terminators[uid], config) + } else if ns.Type == KubeEventDeleted { + if _, ok := terminators[uid]; !ok { + log.Println("Received DELETED event for uid that is not known, ignoring ", uid) + return + } + + close(terminators[uid]) + delete(terminators, uid) + } else if ns.Type == InternalEventDeleteAll { + for uid, c := range terminators { + close(c) + delete(terminators, uid) + } + } +} + +// Conductor manages a set of goroutines one per namespace. +func Conductor(in <-chan Event, done <-chan Done, config Config) <-chan Event { + // done in arguments is a channel that can be used to stop Conductor itsefl + // while map of Done's below is for terminating managed gorotines. + + // Idea of this map is to keep termination channels organized + // so when DELETED event occurs on a namespace it would be possible + // to terminater related goroutine. + terminators := map[string]chan Done{} + + ns := Event{} + out := make(chan Event) + + go func() { + for { + select { + case ns = <-in: + manageResources(ns, terminators, config, out) + case <-done: + return + } + } + }() + + return out +} diff --git a/rsearch/config.go b/rsearch/config.go new file mode 100644 index 0000000000..84aced0311 --- /dev/null +++ b/rsearch/config.go @@ -0,0 +1,65 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "gopkg.in/gcfg.v1" +) + +// Done is an alias for empty struct, used to make broadcast channels +// for terminating goroutines. +type Done struct{} + +// Config is a top level struct describing expected structure of config file. +type Config struct { + Resource Resource + Server Server + Api Api +} + +// Server is a config section describing server instance of this package. +type Server struct { + Port string + Host string + Proto string + Debug bool +} + +// API is a config section describing kubernetes API parameters. +type Api struct { + Url string + NamespaceUrl string +} + +// Resource is a config section describing kubernetes resource to be cached and searched for. +type Resource struct { + Name string + Type string + Selector string + Namespaced string + UrlPrefix string + UrlPostfix string +} + +// NewConfig parsing config file and returning initialized instance of Config structure. +func NewConfig(configFile string) (Config, error) { + cfg := Config{} + if err := gcfg.ReadFileInto(&cfg, configFile); err != nil { + return Config{}, err + } + + return cfg, nil +} diff --git a/rsearch/config.ini b/rsearch/config.ini new file mode 100755 index 0000000000..781b9dcf81 --- /dev/null +++ b/rsearch/config.ini @@ -0,0 +1,20 @@ +; comment +[api] +url=http://192.168.0.10:8080 +namespaceUrl=api/v1/namespaces/?watch=true + +[resource] +;type=builtin # TBD +type=3rdParty +;namespaced=false # TBD +namespaced=true +urlPrefix=apis/romana.io/demo/v1/namespaces +urlPostfix=networkpolicys/?watch=true +name=NetworkPolicy +selector=podSelector + +[server] +port=9700 +proto=http +host=localhost +debug=false diff --git a/rsearch/doc.go b/rsearch/doc.go new file mode 100644 index 0000000000..751ff73132 --- /dev/null +++ b/rsearch/doc.go @@ -0,0 +1,18 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// This package provides means of searching kubernets objects by their selectors. +// The goal is to give the answer to a question - which resources are selecting pods with given label +package rsearch diff --git a/rsearch/processor.go b/rsearch/processor.go new file mode 100644 index 0000000000..637800653b --- /dev/null +++ b/rsearch/processor.go @@ -0,0 +1,114 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "log" +) + +/* +{"type":"ADDED","object":{"apiVersion":"romana.io/demo/v1","kind":"NetworkPolicy","metadata":{"name":"pol1","namespace":"default","selfLink":"/apis/romana.io/demo/v1/namespaces/default/networkpolicys/pol1","uid":"d7036130-e119-11e5-aab8-0213e1312dc5","resourceVersion":"119875","creationTimestamp":"2016-03-03T08:28:00Z","labels":{"owner":"t1"}},"spec":{"allowIncoming":{"from":[{"pods":{"tier":"frontend"}}],"toPorts":[{"port":80,"protocol":"TCP"}]},"podSelector":{"tier":"backend"}}}} +*/ + +// Process is a goroutine that consumes resource update events and maintains a searchable +// cache of all known resources. It also accepts search requests and perform searches. +func Process(in <-chan Event, done chan Done, config Config) chan<- SearchRequest { + // Channel to submit SearchRequest's into. + req := make(chan SearchRequest) + + // storage map is a cache of known KubeObjects + // arranged by NPid. + storage := make(map[string]KubeObject) + + // search map is a cache of known NPid's + // arranged by Selectors, where selector being + // a field by which we search. + search := make(map[string]map[string]bool) + + go func() { + for { + select { + case e := <-in: + // On incoming event update caches. + updateStorage(e, storage, search, config) + case request := <-req: + // On incoming search request return a list + // of resources with matching Selectors. + processSearchRequest(storage, search, request, config) + case <-done: + return + } + } + }() + + return req +} + +// processSearchRequest looks up for KubeObjects with selector matching request.Tag and returns a list of matching objects. +func processSearchRequest(storage map[string]KubeObject, search map[string]map[string]bool, req SearchRequest, config Config) SearchResponse { + if config.Server.Debug { + log.Println("Received request", req) + } + + var resp []KubeObject + + if config.Server.Debug { + log.Printf("Index map has following %s, request tag is %s ", search, req.Tag) + } + + // Assembling response. + for NPid, _ := range search[string(req.Tag)] { + if config.Server.Debug { + log.Printf("Assembling response adding %s to %s", resp, storage[NPid]) + } + resp = append(resp, storage[NPid]) + } + + if config.Server.Debug { + log.Printf("Dispatching final response %s", resp) + } + + req.Resp <- resp // TODO see if it may hang up here + return resp +} + +// updateStorage maintaines up to date state of search and storage maps. +func updateStorage(e Event, storage map[string]KubeObject, search map[string]map[string]bool, config Config) { + NPid := e.Object.makeId() + Selector := e.Object.getSelector(config) + + if e.Type == KubeEventAdded { + if config.Server.Debug { + log.Printf("Processing ADD request for %s", e.Object.Metadata.Name) + } + storage[NPid] = e.Object + if _, ok := search[Selector]; !ok { + m := make(map[string]bool) + search[Selector] = m + } + search[Selector][NPid] = true + } else if e.Type == KubeEventDeleted { + if config.Server.Debug { + log.Printf("Processing DELETE request for %s", e.Object.Metadata.Name) + } + delete(storage, NPid) + delete(search[Selector], NPid) + } else { + if config.Server.Debug { + log.Printf("Received unindentified request %s for %s", e.Type, e.Object.Metadata.Name) + } + } +} diff --git a/rsearch/processor_test.go b/rsearch/processor_test.go new file mode 100644 index 0000000000..70443763b0 --- /dev/null +++ b/rsearch/processor_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "bytes" + "encoding/json" + "gopkg.in/gcfg.v1" + "testing" + "time" +) + +func TestResoureProcessor(t *testing.T) { + config := Config{} + err := gcfg.ReadStringInto(&config, cfgStr) + if err != nil { + t.Errorf("Failed to parse gcfg data: %s", err) + } + + done := make(chan Done) + events := make(chan Event) + + req := Process(events, done, config) + time.Sleep(time.Duration(1 * time.Second)) + + var e Event + policyReader := bytes.NewBufferString(testPolicy) + dec := json.NewDecoder(policyReader) + dec.Decode(&e) + + events <- e + + responseChannel := make(chan SearchResponse) + searchRequest := SearchRequest{Tag: "tier=backend#", Resp: responseChannel} + req <- searchRequest + + result, ok := <-searchRequest.Resp + if !ok { + t.Error("Response channel in SearchRequest object found unexpectedly closed") + } + + if len(result) == 0 { + t.Error("Search request is empty - expecting one result") + } + + if result[0].Metadata.Name != "pol1" { + t.Error("Unexpected search response = expect policy name = pol1, got ", result[0].Metadata.Name) + } +} diff --git a/rsearch/resources.go b/rsearch/resources.go new file mode 100644 index 0000000000..1efb8481df --- /dev/null +++ b/rsearch/resources.go @@ -0,0 +1,153 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "encoding/json" + "fmt" + "log" + "net/http" +) + +/* +{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"default","selfLink":"/api/v1/namespaces/default","uid":"d10db271-dc03-11e5-9c86-0213e1312dc5","resourceVersion":"6","creationTimestamp":"2016-02-25T21:07:45Z"},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} +*/ + +// Event is a representation of a structure that we receive from kubernetes API. +type Event struct { + Type string `json:"Type"` + Object KubeObject `json:"object"` +} + +const ( + KubeEventAdded = "ADDED" + KubeEventDeleted = "DELETED" + InternalEventDeleteAll = "_DELETE_ALL" +) + +// KubeObject is a representation of object in kubernetes. +type KubeObject struct { + Kind string `json:"kind"` + Spec Spec `json:"spec"` + ApiVersion string `json:"apiVersion"` + Metadata Metadata `json:"metadata"` + Status map[string]string `json:"status,omitempty"` +} + +// makeId makes id to identify kube object. +func (o KubeObject) makeId() string { + id := o.Metadata.Name + "/" + o.Metadata.Namespace + return id +} + +// getSelector extracts selector value from KubeObject. +func (o KubeObject) getSelector(config Config) string { + var selector string + // TODO this should use Config.Resource.Selector path instead of podSelector. + for k, v := range o.Spec.PodSelector { + selector = k + "=" + v + "#" + } + return selector +} + +// TODO need to find a way to use different specs for different resources. +type Spec struct { + AllowIncoming map[string]interface{} `json:"allowIncoming"` + PodSelector map[string]string `json:"podSelector"` +} + +// Metadata is a representation of metadata in kubernetes object +type Metadata struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + SelfLink string `json:"selfLink"` + Uid string `json:"uid"` + ResourceVersion string `json:"resourceVersion"` + CreationTimestamp string `json:"creationTimestamp"` + Labels map[string]string `json:"labels"` +} + +// watchEvents maintains goroutine fired by NsWatch, restarts it in case HTTP GET times out. +func watchEvents(done <-chan Done, url string, config Config, resp *http.Response, out chan Event) { + if config.Server.Debug { + log.Println("Received namespace related event from kubernetes", resp.Body) + } + + dec := json.NewDecoder(resp.Body) + var e Event + + for { + select { + case <-done: + return + default: + // Attempting to read event from HTTP connection + err := dec.Decode(&e) + if err != nil { + // If fail + if config.Server.Debug { + log.Printf("Failed to decode message from connection %s due to %s\n. Attempting to re-establish", url, err) + } + // Then stop all goroutines + out <- Event{Type: InternalEventDeleteAll} + + // And try to re-establish HTTP connection + resp, err2 := http.Get(url) + if (err2 != nil) && (config.Server.Debug) { + log.Printf("Failed establish connection %s due to %s\n.", url, err) + } else if err2 == nil { + dec = json.NewDecoder(resp.Body) + } + } else { + // Else submit event + out <- e + } + } + } +} + +// NsWatch is a generator that watches namespace related events in +// kubernetes API and publishes this events to a channel. +func NsWatch(done <-chan Done, url string, config Config) (<-chan Event, error) { + out := make(chan Event) + + resp, err := http.Get(url) + if err != nil { + return out, err + } + + go watchEvents(done, url, config, resp, out) + + return out, nil +} + +// Produce method listens for resource updates happening within givcen namespace +// and publishes this updates in a channel +func (ns KubeObject) Produce(out chan Event, done <-chan Done, config Config) error { + url := fmt.Sprintf("%s/%s/%s/%s", config.Api.Url, config.Resource.UrlPrefix, ns.Metadata.Name, config.Resource.UrlPostfix) + if config.Server.Debug { + log.Println("Launching producer to listen on ", url) + } + + resp, err := http.Get(url) + if err != nil { + return err + } + + go watchEvents(done, url, config, resp, out) + + return nil +} diff --git a/rsearch/resources_test.go b/rsearch/resources_test.go new file mode 100644 index 0000000000..4c32499acc --- /dev/null +++ b/rsearch/resources_test.go @@ -0,0 +1,107 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "fmt" + "gopkg.in/gcfg.v1" + "log" + "math/rand" + "net/http" + "testing" + "time" +) + +const cfgStr = ` +; comment +[api] +url=http://127.0.0.1 +namespaceUrl=api/v1/namespaces + +[resource] +type=3rdParty +namespaced=true +urlPrefix=apis/romana.io/demo/v1/namespaces +urlPostfix=networkpolicys +name=NetworkPolicy +selector=podSelector + +[server] +port=9700 +host=localhost +proto=http +debug=true +` + +const testNs = `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"default","selfLink":"/api/v1/namespaces/default","uid":"d10db271-dc03-11e5-9c86-0213e1312dc5","resourceVersion":"6","creationTimestamp":"2016-02-25T21:07:45Z"},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} +` + +const testPolicy = `{"type":"ADDED","object":{"apiVersion":"romana.io/demo/v1","kind":"NetworkPolicy","metadata":{"name":"pol1","namespace":"default","selfLink":"/apis/romana.io/demo/v1/namespaces/default/networkpolicys/pol1","uid":"d7036130-e119-11e5-aab8-0213e1312dc5","resourceVersion":"119875","creationTimestamp":"2016-03-03T08:28:00Z","labels":{"owner":"t1"}},"spec":{"allowIncoming":{"from":[{"pods":{"tier":"frontend"}}],"toPorts":[{"port":80,"protocol":"TCP"}]},"podSelector":{"tier":"backend"}}}} +` + +func fakeNsHandler(w http.ResponseWriter, req *http.Request) { + fmt.Fprint(w, testNs) +} + +func fakePolicyHandler(w http.ResponseWriter, req *http.Request) { + fmt.Fprint(w, testPolicy) +} + +func fakeServer(config Config) { + http.HandleFunc("/"+config.Api.NamespaceUrl, fakeNsHandler) + policyUrl := "/" + config.Resource.UrlPrefix + "/default/" + config.Resource.UrlPostfix + http.HandleFunc(policyUrl, fakePolicyHandler) + log.Fatal(http.ListenAndServe(config.Api.Url[7:], nil)) +} + +// Testing ability of nswatch to watch ns events +func TestNsWatch(t *testing.T) { + config := Config{} + err := gcfg.ReadStringInto(&config, cfgStr) + if err != nil { + t.Errorf("Failed to parse gcfg data: %s", err) + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + port := 50000 + r.Intn(1000) + config.Api.Url = fmt.Sprintf("%s:%d", config.Api.Url, port) + + go fakeServer(config) + time.Sleep(time.Duration(5 * time.Second)) + + done := make(chan Done) + url := config.Api.Url + "/" + config.Api.NamespaceUrl + nsEvents, err := NsWatch(done, url, config) + if err != nil { + t.Error("Namespace watcher failed to start", err) + } + + ns := <-nsEvents + if ns.Object.Metadata.Name != "default" { + t.Error("Expecting namespace name = default, got =", ns.Object.Metadata.Name) + } + + events := make(chan Event) + err = ns.Object.Produce(events, done, config) + if err != nil { + t.Error("Namespace producer failed to start", err) + } + + e := <-events + if e.Object.Metadata.Name != "pol1" { + t.Error("Expecting policy name = pol1, got =", e.Object.Metadata.Name) + } +} diff --git a/rsearch/server.go b/rsearch/server.go new file mode 100644 index 0000000000..4c03647186 --- /dev/null +++ b/rsearch/server.go @@ -0,0 +1,91 @@ +// Copyright (c) 2016 Pani Networks +// All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package rsearch + +import ( + "encoding/json" + "fmt" + "log" + "net/http" +) + +// SearchRequest associates search request data with channel that will be used +// to fetch response from Processor +type SearchRequest struct { + Tag string `json:"tag"` + Resp chan SearchResponse `json:",omitempty"` +} + +// SearchResponse is a list of kubernetes objects +type SearchResponse []KubeObject + +// responseWaiter dispatches search request and waits for search response to arrive. +func responseWaiter(w http.ResponseWriter, request SearchRequest, inbox chan<- SearchRequest, config Config) { + // Making channel for search responses + request.Resp = make(chan SearchResponse) + defer close(request.Resp) + + // Submitting search object to Processor goroutine + inbox <- request + if config.Server.Debug { + log.Println("ResponseWaiter awaiting for answer") + } + + // Waiting for SearchResponse to arrive + // TODO handle timeouts here + response := <-request.Resp + + if config.Server.Debug { + log.Println("Sending response ", response) + } + + // Preparing response for sending back to client + result, err := json.Marshal(response) + if err != nil { + log.Println("Failed to marshal search response with json marshaller, that's weird as we expect it to come freshly decoded, ", response) + panic(err) + } + + if config.Server.Debug { + log.Println("Sending response ", string(result)) + } + fmt.Fprint(w, string(result)) +} + +// handler to serve search requests from client from this library. +func handler(w http.ResponseWriter, r *http.Request, inbox chan<- SearchRequest, config Config) { + decoder := json.NewDecoder(r.Body) + request := SearchRequest{} + err := decoder.Decode(&request) + if err != nil { + panic(err) + } + + if config.Server.Debug { + log.Printf("Server passing request %s to Processor", request) + } + responseWaiter(w, request, inbox, config) +} + +// Serve responses from caching server. +// Inbox is a channel for submitting SearchRequest's, there is an instance +// of Process goroutine is listening on other side. +func Serve(config Config, inbox chan<- SearchRequest) { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + handler(w, r, inbox, config) + }) + log.Fatal(http.ListenAndServe(":"+config.Server.Port, nil)) +}