diff --git a/.github/workflows/ci-pr.yaml b/.github/workflows/ci-pr.yaml index f5daeb2..19af462 100644 --- a/.github/workflows/ci-pr.yaml +++ b/.github/workflows/ci-pr.yaml @@ -19,7 +19,7 @@ jobs: if: github.event_name == 'pull_request' uses: golangci/golangci-lint-action@v2 with: - version: v1.29 + version: v1.46.2 go-unit-test: runs-on: ubuntu-20.04 needs: [style-check] diff --git a/api/extend/deploy2service.go b/api/extend/deploy2service.go index a4543d0..1dada9e 100644 --- a/api/extend/deploy2service.go +++ b/api/extend/deploy2service.go @@ -35,7 +35,7 @@ func Deploy2Service(r *api.ReqContext) interface{} { cluster = common.GetConfig().DefaultCluster } p := page.Paginate{Search: "name=" + dep} - p.Clusters([]string{cluster}) + _ = p.Clusters([]string{cluster}) res := r.Store.Query(podGvr, store.Query{ Namespace: ns, Paginate: p, @@ -54,7 +54,7 @@ func Deploy2Service(r *api.ReqContext) interface{} { } } p = page.Paginate{} - p.Clusters([]string{cluster}) + _ = p.Clusters([]string{cluster}) res = r.Store.Query(svcGvr, store.Query{ Namespace: ns, Paginate: p, @@ -66,7 +66,7 @@ func Deploy2Service(r *api.ReqContext) interface{} { svc := &v1.Service{} if s, ok := svcIf.(*watcher.ObjType); ok { bs, _ := json.Marshal(s) - json.Unmarshal(bs, svc) + _ = json.Unmarshal(bs, svc) } else { svc = svcIf.(*v1.Service) } diff --git a/api/proxy.go b/api/proxy.go index b893962..7bd87ec 100644 --- a/api/proxy.go +++ b/api/proxy.go @@ -101,7 +101,7 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS var paginate page.Paginate var labelSelectorStr string clusterPrefix := constants.ClusterPrefix - cluster := common.GetConfig().DefaultCluster + cluster := "" query := r.URL.Query() for k, v := range query { switch k { @@ -121,7 +121,7 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS opts, err := ioutil.ReadAll(body) if err == nil { options := v1.DeleteOptions{} - json.Unmarshal(opts, &options) + _ = json.Unmarshal(opts, &options) if len(options.DryRun) > 0 && strings.HasPrefix(options.DryRun[0], clusterPrefix) { cluster = options.DryRun[0][len(clusterPrefix):] options.DryRun = options.DryRun[1:] @@ -166,15 +166,21 @@ func parsePaginateAndLabelsAndClean(r *http.Request) (*page.Paginate, *v1.LabelS if err != nil { return nil, labels, cluster, err } - json.Unmarshal(rr, &paginate) + _ = json.Unmarshal(rr, &paginate) delete(labels.MatchLabels, constants.PaginateKey) } - query.Set("labelSelector", v1.FormatLabelSelector(labels)) + if len(labels.MatchLabels) != 0 || len(labels.MatchExpressions) != 0 { + // if labelSelectorStr is empty, v1.FormatLabelSelector will return "", so we should not use it. + query.Set("labelSelector", v1.FormatLabelSelector(labels)) + } } r.URL.RawQuery = query.Encode() if cs := paginate.GetClusters(); len(cs) > 0 && cluster == "" { cluster = cs[0] } + if cluster == "" { + cluster = common.GetConfig().DefaultCluster + } return &paginate, labels, cluster, nil } @@ -182,6 +188,7 @@ func Proxy(r *ReqContext) interface{} { // version := mux.Vars(r.Request)["version"] namespace := mux.Vars(r.Request)["namespace"] resourceName := mux.Vars(r.Request)["resource"] + gvr := getGVRFromReq(r.Request) paginate, labels, cluster, err := parsePaginateAndLabelsAndClean(r.Request) if err != nil { return proxyPass(r, cluster) @@ -189,7 +196,6 @@ func Proxy(r *ReqContext) interface{} { if cluster == "" { cluster = common.GetConfig().DefaultCluster } - gvr := getGVRFromReq(r.Request) for k, v := range r.Request.URL.Query() { switch k { case "labelSelector": @@ -223,7 +229,7 @@ func Proxy(r *ReqContext) interface{} { log.Debugf("got paginate %v", paginate) items := make([]interface{}, 0) - var total int64 = 0 + var total int64 if labels != nil && (len(labels.MatchLabels) != 0 || len(labels.MatchExpressions) != 0) { // exists label selector res := r.Store.Query(gvr, store.Query{ @@ -260,7 +266,7 @@ func Proxy(r *ReqContext) interface{} { // manually slice items var l = int64(len(items)) - var start, end int64 = 0, 0 + var start, end int64 if paginate.PageSize == 0 || paginate.Page == 0 { // all resources start = 0 @@ -346,7 +352,7 @@ func serverPrint(items []interface{}) interface{} { continue } indexes := map[string]string{} - json.Unmarshal([]byte(indexesStr), &indexes) + _ = json.Unmarshal([]byte(indexesStr), &indexes) if i == 0 { commonCols := []string{"cluster", "namespace", "name"} if _, ok := indexes["namespace"]; !ok { @@ -440,12 +446,12 @@ func proxyPassWatch(r *ReqContext, cluster string) interface{} { t := make([]byte, 1) _, err := reader.Read(t) if err != nil { - r.Writer.Write(buf.Bytes()) + _, _ = r.Writer.Write(buf.Bytes()) return nil } buf.Write(t) if t[0] == '\n' { - r.Writer.Write(buf.Bytes()) + _, _ = r.Writer.Write(buf.Bytes()) buf.Reset() } select { diff --git a/api/proxy_test.go b/api/proxy_test.go index ec0d898..ee237f6 100644 --- a/api/proxy_test.go +++ b/api/proxy_test.go @@ -6,14 +6,15 @@ import ( "net/http" "testing" - "github.com/DaoCloud/ckube/common" - "github.com/DaoCloud/ckube/store" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + + "github.com/DaoCloud/ckube/common" + "github.com/DaoCloud/ckube/store" ) type fakeWriter struct { @@ -63,13 +64,6 @@ var podsMap = map[string]string{ "resourceType": "pods", } -var nsMap = map[string]string{ - "namespace": "default", - "group": "", - "version": "v1", - "resourceType": "pods", -} - func podsInterfaces(pods []v1.Pod) []interface{} { a := []interface{}{} for _, p := range pods { diff --git a/cmd/cacheproxy/main.go b/cmd/cacheproxy/main.go index 73ccf61..caf0231 100644 --- a/cmd/cacheproxy/main.go +++ b/cmd/cacheproxy/main.go @@ -4,6 +4,16 @@ import ( "encoding/json" "flag" "fmt" + "io/ioutil" + "os" + "path" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + kubeapi "k8s.io/client-go/tools/clientcmd/api/v1" + "sigs.k8s.io/yaml" + "github.com/DaoCloud/ckube/common" "github.com/DaoCloud/ckube/log" "github.com/DaoCloud/ckube/server" @@ -12,14 +22,6 @@ import ( "github.com/DaoCloud/ckube/utils" "github.com/DaoCloud/ckube/utils/prommonitor" "github.com/DaoCloud/ckube/watcher" - "io/ioutil" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - kubeapi "k8s.io/client-go/tools/clientcmd/api/v1" - "os" - "path" - "sigs.k8s.io/yaml" ) func GetK8sConfigConfigWithFile(kubeconfig, context string) *rest.Config { @@ -156,7 +158,7 @@ func loadFromConfig(kubeConfig, configFile string) (map[string]kubernetes.Interf } m := memory.NewMemoryStore(indexConf) w := watcher.NewWatcher(clusterConfigs, storeGVRConfig, m) - w.Start() + _ = w.Start() return clusterClients, w, m, nil } @@ -195,33 +197,29 @@ func main() { } defer fixedWatcher.Close() go func() { - for { - select { - case e := <-fixedWatcher.Events(): - log.Infof("get file watcher event: %v", e) - switch e.Type { - case utils.EventTypeChanged: - // do reload - case utils.EventTypeError: - log.Errorf("got file watcher error type: file: %s", e.Name) - break - // do reload - } - clis, rw, rs, err := loadFromConfig(kubeConfig, configFile) - if err != nil { - prommonitor.ConfigReload.WithLabelValues("failed").Inc() - log.Errorf("watcher: reload config error: %v", err) - continue - } - prommonitor.Resources.Reset() - w.Stop() - w = rw - ser.ResetStore(rs, clis) // reset store - prommonitor.ConfigReload.WithLabelValues("success").Inc() - log.Infof("auto reloaded config successfully") + for e := range fixedWatcher.Events() { + log.Infof("get file watcher event: %v", e) + switch e.Type { + case utils.EventTypeChanged: + // do reload + case utils.EventTypeError: + log.Errorf("got file watcher error type: file: %s", e.Name) + // do reload + } + clis, rw, rs, err := loadFromConfig(kubeConfig, configFile) + if err != nil { + prommonitor.ConfigReload.WithLabelValues("failed").Inc() + log.Errorf("watcher: reload config error: %v", err) + continue } + prommonitor.Resources.Reset() + _ = w.Stop() + w = rw + ser.ResetStore(rs, clis) // reset store + prommonitor.ConfigReload.WithLabelValues("success").Inc() + log.Infof("auto reloaded config successfully") } }() } - ser.Run() + _ = ser.Run() } diff --git a/cmd/ckube-plugin/main.go b/cmd/ckube-plugin/main.go index 55d2520..afe35a8 100644 --- a/cmd/ckube-plugin/main.go +++ b/cmd/ckube-plugin/main.go @@ -1,12 +1,14 @@ package main import ( - "github.com/DaoCloud/ckube/log" - "github.com/DaoCloud/ckube/page" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "os/exec" "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/DaoCloud/ckube/log" + "github.com/DaoCloud/ckube/page" ) const ( @@ -28,7 +30,7 @@ func main() { typ = get case "create": typ = create - //case "delete": + // case "delete": // typ = del } if a == "--clusters" { @@ -57,7 +59,7 @@ func main() { switch typ { case get: p := page.Paginate{} - p.Clusters(cs) + _ = p.Clusters(cs) selector := "" if selectorPos != 0 { selector = args[selectorPos] @@ -76,15 +78,15 @@ func main() { os.Exit(2) } p := page.Paginate{} - p.Clusters(cs) + _ = p.Clusters(cs) o, _ := page.QueryCreateOptions(metav1.CreateOptions{}, cs[0]) args = append(args, "--field-manager", o.FieldManager) } } c := exec.Command("kubectl", args...) - //fmt.Printf("args %v\n", args) + // fmt.Printf("args %v\n", args) c.Stdin = os.Stdin c.Stdout = os.Stdout c.Stderr = os.Stderr - c.Run() + _ = c.Run() } diff --git a/cmd/ckubecli/main.go b/cmd/ckubecli/main.go index 5d847e2..721b787 100644 --- a/cmd/ckubecli/main.go +++ b/cmd/ckubecli/main.go @@ -6,8 +6,9 @@ import ( "os" "strings" - "github.com/DaoCloud/ckube/page" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/DaoCloud/ckube/page" ) func main() { @@ -30,7 +31,7 @@ func main() { } if clusters != "" { ccs := strings.Split(clusters, ",") - p.Clusters(ccs) + _ = p.Clusters(ccs) } o, err := page.QueryListOptions(v1.ListOptions{}, p) if err != nil { diff --git a/pkg/client/fake/fake_ckube.go b/pkg/client/fake/fake_ckube.go index 88fa970..9a4095c 100644 --- a/pkg/client/fake/fake_ckube.go +++ b/pkg/client/fake/fake_ckube.go @@ -45,18 +45,12 @@ func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) { cfg.Token = "" common.InitConfig(&cfg) indexConf := map[store.GroupVersionResource]map[string]string{} - storeGVRConfig := []store.GroupVersionResource{} for _, proxy := range cfg.Proxies { indexConf[store.GroupVersionResource{ Group: proxy.Group, Version: proxy.Version, Resource: proxy.Resource, }] = proxy.Index - storeGVRConfig = append(storeGVRConfig, store.GroupVersionResource{ - Group: proxy.Group, - Version: proxy.Version, - Resource: proxy.Resource, - }) } m := memory.NewMemoryStore(indexConf) addr := "http://" + func() string { @@ -76,7 +70,7 @@ func NewFakeCKubeServer(listenAddr string, config string) (CkubeServer, error) { } ser := server.NewMuxServer(listenAddr, nil, m, s.registerFakeRoute) s.ser = ser - go ser.Run() + go ser.Run() // nolint: errcheck for i := 0; i < 5; i++ { time.Sleep(time.Millisecond * 100 * time.Duration(1< 1 { - method = keys[0] - path = strings.Join(keys[1:], ":") - } else { - method = "*" - path = key - } - return -} - func jsonResp(writer http.ResponseWriter, status int, v interface{}) { b, _ := json.Marshal(v) writer.Header().Set("Content-Type", "application/json") writer.WriteHeader(status) - writer.Write(b) + _, _ = writer.Write(b) } func (m *muxServer) registerRoutes(router *mux.Router, handleRoutes []route) { @@ -173,8 +162,7 @@ func (m *muxServer) registerRoutes(router *mux.Router, handleRoutes []route) { return } } - var res interface{} - res = route.handler(&api.ReqContext{ + var res = route.handler(&api.ReqContext{ ClusterClients: m.clusterClients, Store: m.store, Request: r, @@ -184,19 +172,19 @@ func (m *muxServer) registerRoutes(router *mux.Router, handleRoutes []route) { return } var status int - switch res.(type) { + switch res := res.(type) { case error: log.Errorf("request return a unexpected error: %v", res) panic(res) case v1.Status: - status = int(res.(v1.Status).Code) + status = int(res.Code) case *v1.Status: - status = int(res.(*v1.Status).Code) + status = int(res.Code) case string: - writer.Write([]byte(res.(string))) + _, _ = writer.Write([]byte(res)) return case []byte: - writer.Write(res.([]byte)) + _, _ = writer.Write(res) return default: status = route.successStatus diff --git a/store/memory/memory.go b/store/memory/memory.go index dd2079c..3a9bdb6 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -4,18 +4,20 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/DaoCloud/ckube/utils/prommonitor" "sort" "strconv" "strings" "sync" + "github.com/DaoCloud/ckube/utils/prommonitor" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/jsonpath" + "github.com/DaoCloud/ckube/common/constants" "github.com/DaoCloud/ckube/log" "github.com/DaoCloud/ckube/store" "github.com/DaoCloud/ckube/utils" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/util/jsonpath" ) type resourceObj struct { @@ -44,7 +46,7 @@ func NewMemoryStore(indexConf map[store.GroupVersionResource]map[string]string) indexConf: indexConf, } resourceMap := make(map[store.GroupVersionResource]clusterResource) - for k, _ := range indexConf { + for k := range indexConf { resourceMap[k] = clusterResource{} } s.resourceMap = resourceMap @@ -292,7 +294,7 @@ func (m *memoryStore) Query(gvr store.GroupVersionResource, query store.Query) s return res } res.Total = l - var start, end int64 = 0, 0 + var start, end int64 if query.PageSize == 0 { // all resources start = 0 @@ -323,7 +325,7 @@ func (m *memoryStore) buildResourceWithIndex(gvr store.GroupVersionResource, clu mobj := utils.Obj2JSONMap(obj) for k, v := range m.indexConf[gvr] { w := bytes.NewBuffer([]byte{}) - jp.Parse(v) + _ = jp.Parse(v) err := jp.Execute(w, mobj) if err != nil { log.Warnf("exec jsonpath error: %v, %v", obj, err) diff --git a/store/memory/memory_test.go b/store/memory/memory_test.go index 2f44b53..3dfc56b 100644 --- a/store/memory/memory_test.go +++ b/store/memory/memory_test.go @@ -4,13 +4,14 @@ import ( "fmt" "testing" - "github.com/DaoCloud/ckube/common/constants" - "github.com/DaoCloud/ckube/page" - "github.com/DaoCloud/ckube/store" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + + "github.com/DaoCloud/ckube/common/constants" + "github.com/DaoCloud/ckube/page" + "github.com/DaoCloud/ckube/store" ) var podsGVR = store.GroupVersionResource{ @@ -743,7 +744,7 @@ func TestMemoryStore_Query(t *testing.T) { t.Run(fmt.Sprintf("%d-%s", i, c.name), func(t *testing.T) { s := NewMemoryStore(testIndexConf) for _, r := range c.resources { - s.OnResourceAdded(c.gvr, "", r) + _ = s.OnResourceAdded(c.gvr, "", r) } res := s.Query(c.gvr, c.query) assert.Equal(t, c.res, res) diff --git a/utils/fs_watcher.go b/utils/fs_watcher.go index 1d71426..4c16ab3 100644 --- a/utils/fs_watcher.go +++ b/utils/fs_watcher.go @@ -1,12 +1,13 @@ package utils import ( - "github.com/DaoCloud/ckube/log" - "github.com/fsnotify/fsnotify" "io" "os" - "sync" "time" + + "github.com/fsnotify/fsnotify" + + "github.com/DaoCloud/ckube/log" ) type FixedFileWatcher interface { @@ -29,7 +30,6 @@ const ( type fixedFileWatcher struct { files []string - mux sync.RWMutex fswatcher *fsnotify.Watcher events chan Event } @@ -65,7 +65,7 @@ func (w *fixedFileWatcher) Start() error { } } go func() { - for { + for { // nolint:gosimple select { case e, open := <-w.fswatcher.Events: if !open { @@ -79,7 +79,7 @@ func (w *fixedFileWatcher) Start() error { case fsnotify.Remove: // 在 Kubernetes 里面,当挂载 ConfigMap 的时候,如果发生文件重新,Kubernetes 会首先删除这个文件 // 再重新创建,所以我们应该在删除之后重新建立 watcher。 - w.fswatcher.Remove(e.Name) + _ = w.fswatcher.Remove(e.Name) time.Sleep(time.Second * 2) // 等待一定时间之后重新加入 watcher 队列 err := w.fswatcher.Add(e.Name) diff --git a/utils/utils.go b/utils/utils.go index d792540..071c8e1 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -5,6 +5,6 @@ import "encoding/json" func Obj2JSONMap(obj interface{}) map[string]interface{} { m := make(map[string]interface{}) bs, _ := json.Marshal(obj) - json.Unmarshal(bs, &m) + _ = json.Unmarshal(bs, &m) return m } diff --git a/watcher/watcher.go b/watcher/watcher.go index c0a2144..5a205ab 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -8,15 +8,16 @@ import ( "sync" "time" - "github.com/DaoCloud/ckube/common" - "github.com/DaoCloud/ckube/log" - "github.com/DaoCloud/ckube/store" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + + "github.com/DaoCloud/ckube/common" + "github.com/DaoCloud/ckube/log" + "github.com/DaoCloud/ckube/store" ) type watcher struct { @@ -50,7 +51,7 @@ type ObjType struct { func (o *ObjType) UnmarshalJSON(bytes []byte) error { m := map[string]interface{}{} - json.Unmarshal(bytes, &m) + _ = json.Unmarshal(bytes, &m) if v, ok := m["apiVersion"]; ok { o.APIVersion = v.(string) } @@ -59,7 +60,7 @@ func (o *ObjType) UnmarshalJSON(bytes []byte) error { } if meta, ok := m["metadata"]; ok { bs, _ := json.Marshal(meta) - json.Unmarshal(bs, &o.ObjectMeta) + _ = json.Unmarshal(bs, &o.ObjectMeta) } delete(m, "apiVersion") delete(m, "kind") @@ -97,8 +98,8 @@ func (o ObjType) GetObjectKind() schema.ObjectKind { } func (o ObjType) DeepCopyObject() runtime.Object { - //o.lock.Lock() - //defer o.lock.Unlock() + // o.lock.Lock() + // defer o.lock.Unlock() m := map[string]interface{}{} for k, v := range o.Data { m[k] = v @@ -140,7 +141,7 @@ func (w *watcher) watchResources(r store.GroupVersionResource, cluster string) { return default: } - ctx, calcel := context.WithTimeout(context.Background(), time.Hour) + ctx, cancel := context.WithTimeout(context.Background(), time.Hour) url := "" if r.Group == "" { url = fmt.Sprintf("/api/%s/%s?watch=true", r.Version, r.Resource) @@ -160,17 +161,17 @@ func (w *watcher) watchResources(r store.GroupVersionResource, cluster string) { if first { // only clean resource at the first time // to avoid the resources gone after server break. - w.store.Clean(r, cluster) + _ = w.store.Clean(r, cluster) first = false } if open { switch rr.Type { case watch.Added: - w.store.OnResourceAdded(r, cluster, rr.Object) + _ = w.store.OnResourceAdded(r, cluster, rr.Object) case watch.Modified: - w.store.OnResourceModified(r, cluster, rr.Object) + _ = w.store.OnResourceModified(r, cluster, rr.Object) case watch.Deleted: - w.store.OnResourceDeleted(r, cluster, rr.Object) + _ = w.store.OnResourceDeleted(r, cluster, rr.Object) case watch.Error: log.Warnf("cluster(%s): watch stream(%v) error: %v", cluster, r, rr.Object) } @@ -182,11 +183,12 @@ func (w *watcher) watchResources(r store.GroupVersionResource, cluster string) { } case <-w.stop: ww.Stop() + cancel() return } } } - calcel() + cancel() } }