Skip to content

Commit

Permalink
rename sharecache to multiplexer.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Sep 14, 2024
1 parent fbfa01f commit 8ecdab1
Show file tree
Hide file tree
Showing 13 changed files with 1,289 additions and 10 deletions.
16 changes: 8 additions & 8 deletions pkg/yurthub/filter/servicetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
discoveryv1 "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc
}

switch v := obj.(type) {
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice:
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice:
return stf.serviceTopologyHandler(v)
default:
return obj
Expand Down Expand Up @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object)
case *discoveryV1beta1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discoveryV1beta1.LabelServiceName]
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discovery.LabelServiceName]
svcName = v.Labels[discoveryv1.LabelServiceName]
case *v1.Endpoints:
svcNamespace = v.Namespace
svcName = v.Name
Expand All @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, stf.nodeName, nil)
case *v1.Endpoints:
return reassembleEndpoints(v, stf.nodeName, nil)
Expand All @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, "", nodes)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, "", nodes)
case *v1.Endpoints:
return reassembleEndpoints(v, "", nodes)
Expand Down Expand Up @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic
}

// reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice
func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice {
func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice {
if len(nodeName) != 0 && len(nodes) != 0 {
klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName)
return endpointSlice
}

var newEps []discovery.Endpoint
var newEps []discoveryv1.Endpoint
for i := range endpointSlice.Endpoints {
if len(nodeName) != 0 {
if *endpointSlice.Endpoints[i].NodeName == nodeName {
Expand Down
74 changes: 74 additions & 0 deletions pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multiplexer

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
kstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/client-go/kubernetes/scheme"
)

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

type ResourceCacheConfig struct {
KeyFunc func(runtime.Object) (string, error)
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
GetAttrsFunc kstorage.AttrFunc
NamespaceScoped bool
}

func NewResourceCache(
s kstorage.Interface,
resource *schema.GroupVersionResource,
config *ResourceCacheConfig) (Interface, func(), error) {

cacheConfig := cacher.Config{
Storage: s,
Versioner: kstorage.APIObjectVersioner{},
GroupResource: resource.GroupResource(),
KeyFunc: config.KeyFunc,
NewFunc: config.NewFunc,
NewListFunc: config.NewListFunc,
GetAttrsFunc: config.GetAttrsFunc,
Codec: scheme.Codecs.LegacyCodec(resource.GroupVersion()),
}

cacher, err := cacher.NewCacherFromConfig(cacheConfig)
if err != nil {
return nil, func() {}, fmt.Errorf("failed to new cacher from config, error: %v", err)

Check warning on line 63 in pkg/yurthub/multiplexer/cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/cache.go#L63

Added line #L63 was not covered by tests
}

var once sync.Once
destroyFunc := func() {
once.Do(func() {
cacher.Stop()
})

Check warning on line 70 in pkg/yurthub/multiplexer/cache.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/cache.go#L68-L70

Added lines #L68 - L70 were not covered by tests
}

return cacher, destroyFunc, nil
}
143 changes: 143 additions & 0 deletions pkg/yurthub/multiplexer/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multiplexer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage"

ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
)

var serviceGVR = &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
}

var serviceKeyFunc = func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}

return registry.NamespaceKeyFunc(request.WithNamespace(request.NewContext(), accessor.GetNamespace()), "", accessor.GetName())
}

var newServiceFunc = func() runtime.Object {
return &v1.Service{}
}

var newServiceListFunc = func() runtime.Object {
return &v1.ServiceList{}
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
serviceKeyFunc,
newServiceFunc,
newServiceListFunc,
storage.DefaultNamespaceScopedAttr,
true,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
}

func mockListOptions() storage.ListOptions {
return storage.ListOptions{
ResourceVersion: "100",
Recursive: true,
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
},
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")})

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
serviceKeyFunc,
newServiceFunc,
newServiceListFunc,
storage.DefaultNamespaceScopedAttr,
true,
},
)

assert.Nil(t, err)
assertCacheWatch(t, cache, fakeStorage)
}

func mockWatchOptions() storage.ListOptions {
var sendInitialEvents = true

return storage.ListOptions{
ResourceVersion: "100",
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
},
Recursive: true,
SendInitialEvents: &sendInitialEvents,
}
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())

go func() {
fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2"))
}()

assert.Nil(t, err)
event := <-receive.ResultChan()
assert.Equal(t, watch.Added, event.Type)
}
Loading

0 comments on commit 8ecdab1

Please sign in to comment.