Skip to content

Commit

Permalink
chore: support control and data clusters can be the same one (#7119)
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf authored Apr 25, 2024
1 parent 937691c commit fe329ab
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 36 deletions.
2 changes: 1 addition & 1 deletion cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func main() {
viper.SetDefault(constant.CfgKeyServerInfo, *ver)

// multi-cluster manager for all data-plane k8s
multiClusterMgr, err := multicluster.Setup(mgr.GetScheme(), mgr.GetClient(), multiClusterKubeConfig, multiClusterContexts)
multiClusterMgr, err := multicluster.Setup(mgr.GetScheme(), mgr.GetConfig(), mgr.GetClient(), multiClusterKubeConfig, multiClusterContexts)
if err != nil {
setupLog.Error(err, "unable to setup multi-cluster manager")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func main() {
}

// multi-cluster manager for all data-plane k8s
multiClusterMgr, err := multicluster.Setup(mgr.GetScheme(), mgr.GetClient(), multiClusterKubeConfig, multiClusterContexts)
multiClusterMgr, err := multicluster.Setup(mgr.GetScheme(), mgr.GetConfig(), mgr.GetClient(), multiClusterKubeConfig, multiClusterContexts)
if err != nil {
setupLog.Error(err, "unable to setup multi-cluster manager")
os.Exit(1)
Expand Down
53 changes: 46 additions & 7 deletions pkg/controller/multicluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type clientReader struct {
var _ client.Reader = &clientReader{}

func (c *clientReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
request := func(cli client.Client) error {
return cli.Get(ctx, key, obj, opts...)
request := func(cc contextCli, o client.Object) error {
return cc.cli.Get(ctx, key, o, opts...)
}
return anyOf(c.mctx, ctx, obj, request, opts)
}
Expand Down Expand Up @@ -215,8 +215,8 @@ type subResourceReader struct {
var _ client.SubResourceReader = &subResourceReader{}

func (c *subResourceReader) Get(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceGetOption) error {
request := func(cli client.Client) error {
return cli.SubResource(c.subResource).Get(ctx, obj, subResource, opts...)
request := func(cc contextCli, o client.Object) error {
return cc.cli.SubResource(c.subResource).Get(ctx, o, subResource, opts...)
}
return anyOf(c.mctx, ctx, obj, request, opts)
}
Expand Down Expand Up @@ -272,13 +272,40 @@ func allOf(mctx mcontext, ctx context.Context, obj client.Object, request func(c
return err
}

func anyOf(mctx mcontext, ctx context.Context, obj client.Object, request func(client.Client) error, opts any) error {
func anyOf(mctx mcontext, ctx context.Context, obj client.Object, request func(contextCli, client.Object) error, opts any) error {
o := hasClientOption(opts)
if o == nil && !o.multiCheck {
return anyOf_(mctx, ctx, obj, request, opts)
}
return anyOfWithMultiCheck(mctx, ctx, obj, request, opts)
}

func anyOf_(mctx mcontext, ctx context.Context, obj client.Object, request func(contextCli, client.Object) error, opts any) error {
var err error
for _, cc := range resolvedClients(mctx, ctx, obj, opts) {
if err = request(cc.cli); err == nil {
if e := request(cc, obj); e == nil {
return nil
} else if err == nil {
err = e
}
}
return err
}

func anyOfWithMultiCheck(mctx mcontext, ctx context.Context, obj client.Object, request func(contextCli, client.Object) error, opts any) error {
var err error
objs := make([]client.Object, 0)
for _, cc := range resolvedClients(mctx, ctx, obj, opts) {
o := obj.DeepCopyObject().(client.Object)
if e := request(cc, o); e == nil {
objs = append(objs, o)
} else if err == nil {
err = e
}
}
if len(objs) > 0 {
reflect.ValueOf(obj).Elem().Set(reflect.ValueOf(objs[0]).Elem())
}
return err
}

Expand Down Expand Up @@ -307,7 +334,7 @@ func resolvedClients(mctx mcontext, ctx context.Context, obj client.Object, opts
}

if o.universal {
return append([]contextCli{{"", mctx.control}}, dataClients(mctx, fromContext(ctx))...)
return removeDuplicate(append([]contextCli{{"", mctx.control}}, dataClients(mctx, fromContext(ctx))...))
}

if o.oneshot {
Expand Down Expand Up @@ -343,3 +370,15 @@ func dataClients(mctx mcontext, workers []string) []contextCli {
}
return l
}

func removeDuplicate(clients []contextCli) []contextCli {
m := make(map[string]bool)
l := make([]contextCli, 0)
for i, c := range clients {
if !m[c.context] {
m[c.context] = true
l = append(l, clients[i])
}
}
return l
}
20 changes: 13 additions & 7 deletions pkg/controller/multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,31 @@ func (m *manager) GetContexts() []string {
}

func (m *manager) Bind(mgr ctrl.Manager) error {
for k := range m.caches {
if err := mgr.Add(m.caches[k]); err != nil {
return fmt.Errorf("failed to bind cache to Manager: %s", err.Error())
for k, c := range m.caches {
if c != nil {
if err := mgr.Add(m.caches[k]); err != nil {
return fmt.Errorf("failed to bind cache to Manager: %s", err.Error())
}
}
}
return nil
}

func (m *manager) Own(b *builder.Builder, obj, owner client.Object) Manager {
handler := handler.EnqueueRequestForOwner(m.cli.Scheme(), m.cli.RESTMapper(), owner, handler.OnlyControllerOwner())
for k := range m.caches {
b.WatchesRawSource(source.Kind(m.caches[k], obj), handler)
for k, c := range m.caches {
if c != nil {
b.WatchesRawSource(source.Kind(m.caches[k], obj), handler)
}
}
return m
}

func (m *manager) Watch(b *builder.Builder, obj client.Object, eventHandler handler.EventHandler) Manager {
for k := range m.caches {
b.WatchesRawSource(source.Kind(m.caches[k], obj), eventHandler)
for k, c := range m.caches {
if c != nil {
b.WatchesRawSource(source.Kind(m.caches[k], obj), eventHandler)
}
}
return m
}
7 changes: 7 additions & 0 deletions pkg/controller/multicluster/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,18 @@ func Oneshot() *ClientOption {
}
}

func MultiCheck() *ClientOption {
return &ClientOption{
multiCheck: true,
}
}

type ClientOption struct {
control bool // control plane
universal bool // both control and data planes
unspecified bool // data planes, but don't know which ones exactly
oneshot bool
multiCheck bool // only support the Get operation
}

func (o *ClientOption) ApplyToGet(*client.GetOptions) {
Expand Down
73 changes: 53 additions & 20 deletions pkg/controller/multicluster/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,63 +35,96 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

func Setup(scheme *runtime.Scheme, cli client.Client, kubeConfig, contexts string) (Manager, error) {
func Setup(scheme *runtime.Scheme, cfg *rest.Config, cli client.Client, kubeConfig, contexts string) (Manager, error) {
if len(contexts) == 0 {
return nil, nil
}
clients, caches, err := newClientNCache(scheme, kubeConfig, contexts)
mcc, err := newClientNCache(scheme, kubeConfig, contexts)
if err != nil {
return nil, err
}
for k, c := range mcc {
if isSameContextWithControl(cfg, c) {
cc := mcc[k]
// reset the cache and use default cli of control cluster
cc.cache = nil
cc.client = cli
mcc[k] = cc
}
}

clients := func() map[string]client.Client {
m := make(map[string]client.Client)
for _, c := range mcc {
m[c.context] = c.client
}
return m
}
caches := func() map[string]cache.Cache {
m := make(map[string]cache.Cache)
for _, c := range mcc {
m[c.context] = c.cache
}
return m
}

return &manager{
cli: NewClient(cli, clients),
caches: caches,
cli: NewClient(cli, clients()),
caches: caches(),
}, nil
}

func newClientNCache(scheme *runtime.Scheme, kubeConfig, contexts string) (map[string]client.Client, map[string]cache.Cache, error) {
clients := make(map[string]client.Client)
caches := make(map[string]cache.Cache)
// isSameContextWithControl checks whether the context is the same as the control cluster.
func isSameContextWithControl(cfg *rest.Config, mcc multiClusterContext) bool {
return cfg.Host == mcc.id
}

func newClientNCache(scheme *runtime.Scheme, kubeConfig, contexts string) (map[string]multiClusterContext, error) {
mcc := make(map[string]multiClusterContext, 0)
for _, context := range strings.Split(contexts, ",") {
cli, cache, err := newClientNCache4Context(scheme, kubeConfig, context)
cc, err := newClientNCache4Context(scheme, kubeConfig, context)
if err != nil {
return nil, nil, err
return nil, err
}
if cli != nil && cache != nil {
clients[context] = cli
caches[context] = cache
if cc != nil {
mcc[context] = *cc
}
}
return clients, caches, nil
return mcc, nil
}

func newClientNCache4Context(scheme *runtime.Scheme, kubeConfig, context string) (client.Client, cache.Cache, error) {
func newClientNCache4Context(scheme *runtime.Scheme, kubeConfig, context string) (*multiClusterContext, error) {
if len(context) == 0 {
return nil, nil, nil
return nil, nil
}

config, err := getConfigWithContext(kubeConfig, context)
if err != nil {
return nil, nil, fmt.Errorf("unable to get kubeconfig for context %s: %s", context, err.Error())
return nil, fmt.Errorf("unable to get kubeconfig for context %s: %s", context, err.Error())
}
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}

clientOpts, err := clientOptions(scheme, context, config)
if err != nil {
return nil, nil, err
return nil, err
}

cli, err := client.New(config, clientOpts)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Client for context %s: %s", context, err.Error())
return nil, fmt.Errorf("unable to create Client for context %s: %s", context, err.Error())
}
cache, err := cache.New(config, cacheOptions(clientOpts))
if err != nil {
return nil, nil, fmt.Errorf("unable to create Cache for context %s: %s", context, err.Error())
return nil, fmt.Errorf("unable to create Cache for context %s: %s", context, err.Error())
}
return cli, cache, nil
return &multiClusterContext{
context: context,
id: config.Host,
cache: cache,
client: cli,
}, nil
}

func getConfigWithContext(kubeConfig, context string) (*rest.Config, error) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/multicluster/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package multicluster

import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type multiClusterContext struct {
context string
id string
cache cache.Cache
client client.Client
}

0 comments on commit fe329ab

Please sign in to comment.