From bf5b01bae393e3ab91bb7921a1b13a7a9295b329 Mon Sep 17 00:00:00 2001 From: wangyelei Date: Wed, 13 Mar 2024 10:31:36 +0800 Subject: [PATCH] chore: support to configurate the qps and burst for client config (#6807) --- cmd/dataprotection/main.go | 9 +++++---- cmd/manager/main.go | 2 +- controllers/apps/cluster_controller.go | 3 ++- controllers/apps/opsrequest_controller.go | 5 +++++ controllers/dataprotection/backup_controller.go | 2 +- .../dataprotection/log_collection_controller.go | 4 ++-- controllers/dataprotection/types.go | 9 --------- .../replicatedstatemachine_controller.go | 10 ++++++++++ deploy/helm/templates/dataprotection.yaml | 12 ++++++++++++ deploy/helm/templates/deployment.yaml | 8 ++++++++ deploy/helm/values.yaml | 12 ++++++++++-- pkg/constant/const.go | 2 ++ pkg/controllerutil/util.go | 17 +++++++++++++++++ pkg/dataprotection/types/constant.go | 2 ++ pkg/viperx/viperx.go | 4 ++++ 15 files changed, 81 insertions(+), 20 deletions(-) diff --git a/cmd/dataprotection/main.go b/cmd/dataprotection/main.go index 733012c05e7..87be65ff1fa 100644 --- a/cmd/dataprotection/main.go +++ b/cmd/dataprotection/main.go @@ -24,6 +24,7 @@ import ( "flag" "fmt" "os" + "runtime" "strings" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -33,7 +34,7 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/spf13/pflag" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" + k8sruntime "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" discoverycli "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -73,7 +74,7 @@ const ( ) var ( - scheme = runtime.NewScheme() + scheme = k8sruntime.NewScheme() setupLog = ctrl.Log.WithName("setup") ) @@ -105,6 +106,7 @@ func init() { viper.SetDefault(dptypes.CfgKeyExecWorkerServiceAccountName, "kubeblocks-dataprotection-exec-worker") viper.SetDefault(dptypes.CfgKeyWorkerServiceAccountAnnotations, "{}") viper.SetDefault(dptypes.CfgKeyWorkerClusterRoleName, "kubeblocks-dataprotection-worker-role") + viper.SetDefault(dptypes.CfgDataProtectionReconcileWorkers, runtime.NumCPU()) } func main() { @@ -178,8 +180,7 @@ func main() { if len(managedNamespaces) > 0 { setupLog.Info(fmt.Sprintf("managed namespaces: %s", managedNamespaces)) } - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + mgr, err := ctrl.NewManager(intctrlutil.GeKubeRestConfig(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7beb96db317..57ec310b106 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -272,7 +272,7 @@ func main() { enableLeaderElectionID = viper.GetString(leaderElectIDFlagKey.viperName()) kubeContexts = viper.GetString(kubeContextsFlagKey.viperName()) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + mgr, err := ctrl.NewManager(intctrlutil.GeKubeRestConfig(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, diff --git a/controllers/apps/cluster_controller.go b/controllers/apps/cluster_controller.go index b7cd2a5f259..22cc7a01df4 100644 --- a/controllers/apps/cluster_controller.go +++ b/controllers/apps/cluster_controller.go @@ -21,6 +21,7 @@ package apps import ( "context" + "math" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -171,7 +172,7 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return intctrlutil.NewNamespacedControllerManagedBy(mgr). For(&appsv1alpha1.Cluster{}). WithOptions(controller.Options{ - MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), + MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 4)), }). Owns(&appsv1alpha1.Component{}). Owns(&corev1.Service{}). // cluster services diff --git a/controllers/apps/opsrequest_controller.go b/controllers/apps/opsrequest_controller.go index 3fdd342cf38..7d61901b1c6 100644 --- a/controllers/apps/opsrequest_controller.go +++ b/controllers/apps/opsrequest_controller.go @@ -21,6 +21,7 @@ package apps import ( "context" + "math" "reflect" "time" @@ -34,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" @@ -86,6 +88,9 @@ func (r *OpsRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) func (r *OpsRequestReconciler) SetupWithManager(mgr ctrl.Manager) error { return intctrlutil.NewNamespacedControllerManagedBy(mgr). For(&appsv1alpha1.OpsRequest{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 2)), + }). Watches(&appsv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.parseFirstOpsRequest)). Watches(&workloadsv1alpha1.ReplicatedStateMachine{}, handler.EnqueueRequestsFromMapFunc(r.parseFirstOpsRequestForRSM)). Watches(&dpv1alpha1.Backup{}, handler.EnqueueRequestsFromMapFunc(r.parseBackupOpsRequest)). diff --git a/controllers/dataprotection/backup_controller.go b/controllers/dataprotection/backup_controller.go index f115623ea5b..9b8f0ff4ef7 100644 --- a/controllers/dataprotection/backup_controller.go +++ b/controllers/dataprotection/backup_controller.go @@ -134,7 +134,7 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error { b := intctrlutil.NewNamespacedControllerManagedBy(mgr). For(&dpv1alpha1.Backup{}). WithOptions(controller.Options{ - MaxConcurrentReconciles: viper.GetInt(maxConcurDataProtectionReconKey), + MaxConcurrentReconciles: viper.GetInt(dptypes.CfgDataProtectionReconcileWorkers), }). Owns(&appsv1.StatefulSet{}). Owns(&batchv1.Job{}). diff --git a/controllers/dataprotection/log_collection_controller.go b/controllers/dataprotection/log_collection_controller.go index b1b1551db84..0237d6feac7 100644 --- a/controllers/dataprotection/log_collection_controller.go +++ b/controllers/dataprotection/log_collection_controller.go @@ -22,9 +22,9 @@ package dataprotection import ( "context" "fmt" - "runtime" "strings" + "github.com/spf13/viper" "golang.org/x/exp/slices" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -107,7 +107,7 @@ func (r *LogCollectionReconciler) SetupWithManager(mgr ctrl.Manager) error { r: r, })). WithOptions(controller.Options{ - MaxConcurrentReconciles: runtime.NumCPU(), + MaxConcurrentReconciles: viper.GetInt(dptypes.CfgDataProtectionReconcileWorkers), }). Complete(r) } diff --git a/controllers/dataprotection/types.go b/controllers/dataprotection/types.go index 9d0bd845374..9c5e769fd6c 100644 --- a/controllers/dataprotection/types.go +++ b/controllers/dataprotection/types.go @@ -20,12 +20,9 @@ along with this program. If not, see . package dataprotection import ( - "runtime" "time" corev1 "k8s.io/api/core/v1" - - viper "github.com/apecloud/kubeblocks/pkg/viperx" ) const ( @@ -33,8 +30,6 @@ const ( ) const ( - // settings keys - maxConcurDataProtectionReconKey = "MAXCONCURRENTRECONCILES_DATAPROTECTION" // label keys dataProtectionBackupRepoKey = "dataprotection.kubeblocks.io/backup-repo-name" @@ -100,7 +95,3 @@ const ( ) var reconcileInterval = time.Second - -func init() { - viper.SetDefault(maxConcurDataProtectionReconKey, runtime.NumCPU()*2) -} diff --git a/controllers/workloads/replicatedstatemachine_controller.go b/controllers/workloads/replicatedstatemachine_controller.go index bc53a8d857c..4861bd5bc37 100644 --- a/controllers/workloads/replicatedstatemachine_controller.go +++ b/controllers/workloads/replicatedstatemachine_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/log" workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1" @@ -162,6 +163,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er return intctrlutil.NewNamespacedControllerManagedBy(mgr). For(&workloads.ReplicatedStateMachine{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), + }). Watches(&appsv1.StatefulSet{}, stsHandler). Watches(&batchv1.Job{}, jobHandler). Watches(&corev1.Pod{}, podHandler). @@ -176,6 +180,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er podHandler := handler.NewBuilder(ctx).AddFinder(delegatorFinder).Build() return intctrlutil.NewNamespacedControllerManagedBy(mgr). For(&workloads.ReplicatedStateMachine{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), + }). Watches(&batchv1.Job{}, jobHandler). Watches(&corev1.Pod{}, podHandler). Complete(r) @@ -186,6 +193,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er podHandler := handler.NewBuilder(ctx).AddFinder(stsOwnerFinder).AddFinder(rsmOwnerFinder).Build() return intctrlutil.NewNamespacedControllerManagedBy(mgr). For(&workloads.ReplicatedStateMachine{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), + }). Owns(&appsv1.StatefulSet{}). Owns(&batchv1.Job{}). Watches(&corev1.Pod{}, podHandler). diff --git a/deploy/helm/templates/dataprotection.yaml b/deploy/helm/templates/dataprotection.yaml index 74f99160abf..48c9dd88bce 100644 --- a/deploy/helm/templates/dataprotection.yaml +++ b/deploy/helm/templates/dataprotection.yaml @@ -93,6 +93,18 @@ spec: - name: ENABLE_WEBHOOKS value: "true" {{- end }} + {{- if .Values.reconcileWorkers }} + - name: DATAPROTECTION_RECONCILE_WORKERS + value: {{ .Values.dataProtection.reconcileWorkers | quote }} + {{- end }} + {{- if .Values.client.qps }} + - name: CLIENT_QPS + value: {{ .Values.client.qps | quote }} + {{- end }} + {{- if .Values.client.burst }} + - name: CLIENT_BURST + value: {{ .Values.client.burst | quote }} + {{- end }} - name: DP_ENCRYPTION_KEY valueFrom: secretKeyRef: diff --git a/deploy/helm/templates/deployment.yaml b/deploy/helm/templates/deployment.yaml index 1eb0c0f9249..18ff1a43035 100644 --- a/deploy/helm/templates/deployment.yaml +++ b/deploy/helm/templates/deployment.yaml @@ -79,6 +79,14 @@ spec: - name: KUBEBLOCKS_RECONCILE_WORKERS value: {{ .Values.reconcileWorkers | quote }} {{- end }} + {{- if .Values.client.qps }} + - name: CLIENT_QPS + value: {{ .Values.client.qps | quote }} + {{- end }} + {{- if .Values.client.burst }} + - name: CLIENT_BURST + value: {{ .Values.client.burst | quote }} + {{- end }} {{- with .Values.nodeSelector }} - name: CM_NODE_SELECTOR value: {{ toJson . | quote }} diff --git a/deploy/helm/values.yaml b/deploy/helm/values.yaml index 45b6f86b013..4ef477c2c52 100644 --- a/deploy/helm/values.yaml +++ b/deploy/helm/values.yaml @@ -27,10 +27,17 @@ image: ## replicaCount: 1 -## MaxConcurrentReconciles for cluster and component controllers. +## MaxConcurrentReconciles for component, rsm and opsRequest controllers. ## reconcileWorkers: "" +## k8s client configuration. +client: + # default is 20 + qps: "" + # default is 30 + burst: "" + ## @param nameOverride ## nameOverride: "" @@ -318,7 +325,8 @@ dataProtection: # if 'get/list' role of the backup CR are compromised. encryptionKey: "" gcFrequencySeconds: 3600 - + ## MaxConcurrentReconciles for backup controller. + reconcileWorkers: "" worker: serviceAccount: # The name of the service account for worker pods. diff --git a/pkg/constant/const.go b/pkg/constant/const.go index 4252465a76a..41bf9e4cc30 100644 --- a/pkg/constant/const.go +++ b/pkg/constant/const.go @@ -48,6 +48,8 @@ const ( CfgKeyDPEncryptionKey = "DP_ENCRYPTION_KEY" CfgKBReconcileWorkers = "KUBEBLOCKS_RECONCILE_WORKERS" + CfgClientQPS = "CLIENT_QPS" + CfgClientBurst = "CLIENT_BURST" ) const ( diff --git a/pkg/controllerutil/util.go b/pkg/controllerutil/util.go index 5c799a87452..1454cab45e2 100644 --- a/pkg/controllerutil/util.go +++ b/pkg/controllerutil/util.go @@ -26,10 +26,14 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" + viper "github.com/apecloud/kubeblocks/pkg/viperx" ) // GetUncachedObjects returns a list of K8s objects, for these object types, @@ -140,3 +144,16 @@ func SetOwnerReference(owner, object metav1.Object) error { func SetControllerReference(owner, object metav1.Object) error { return controllerutil.SetControllerReference(owner, object, innerScheme) } + +func GeKubeRestConfig() *rest.Config { + cfg := ctrl.GetConfigOrDie() + clientQPS := viper.GetInt(constant.CfgClientQPS) + if clientQPS != 0 { + cfg.QPS = float32(clientQPS) + } + clientBurst := viper.GetInt(constant.CfgClientBurst) + if clientBurst != 0 { + cfg.Burst = clientBurst + } + return cfg +} diff --git a/pkg/dataprotection/types/constant.go b/pkg/dataprotection/types/constant.go index 2e15ce718c4..8f35dd570f6 100644 --- a/pkg/dataprotection/types/constant.go +++ b/pkg/dataprotection/types/constant.go @@ -33,6 +33,8 @@ const ( CfgKeyWorkerServiceAccountAnnotations = "WORKER_SERVICE_ACCOUNT_ANNOTATIONS" // CfgKeyWorkerClusterRoleName is the key of cluster role name for binding the service account of the worker CfgKeyWorkerClusterRoleName = "WORKER_CLUSTER_ROLE_NAME" + // CfgDataProtectionReconcileWorkers the max reconcile workers for MaxConcurrentReconciles + CfgDataProtectionReconcileWorkers = "DATAPROTECTION_RECONCILE_WORKERS" ) // config default values diff --git a/pkg/viperx/viperx.go b/pkg/viperx/viperx.go index 0bad1312ca9..620fc2ae4ba 100644 --- a/pkg/viperx/viperx.go +++ b/pkg/viperx/viperx.go @@ -47,6 +47,10 @@ func GetInt(key string) int { return rCall(key, viper.GetInt) } +func GetFloat64(key string) float64 { + return rCall(key, viper.GetFloat64) +} + func GetInt32(key string) int32 { return rCall(key, viper.GetInt32) }