Skip to content

Commit

Permalink
chore: support to configurate the qps and burst for client config (#6807
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wangyelei authored Mar 13, 2024
1 parent 4852c42 commit bf5b01b
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 20 deletions.
9 changes: 5 additions & 4 deletions cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"flag"
"fmt"
"os"
"runtime"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -33,7 +34,7 @@ import (
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
discoverycli "k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -73,7 +74,7 @@ const (
)

var (
scheme = runtime.NewScheme()
scheme = k8sruntime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

Expand Down Expand Up @@ -105,6 +106,7 @@ func init() {
viper.SetDefault(dptypes.CfgKeyExecWorkerServiceAccountName, "kubeblocks-dataprotection-exec-worker")
viper.SetDefault(dptypes.CfgKeyWorkerServiceAccountAnnotations, "{}")
viper.SetDefault(dptypes.CfgKeyWorkerClusterRoleName, "kubeblocks-dataprotection-worker-role")
viper.SetDefault(dptypes.CfgDataProtectionReconcileWorkers, runtime.NumCPU())
}

func main() {
Expand Down Expand Up @@ -178,8 +180,7 @@ func main() {
if len(managedNamespaces) > 0 {
setupLog.Info(fmt.Sprintf("managed namespaces: %s", managedNamespaces))
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
mgr, err := ctrl.NewManager(intctrlutil.GeKubeRestConfig(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Expand Down
2 changes: 1 addition & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func main() {
enableLeaderElectionID = viper.GetString(leaderElectIDFlagKey.viperName())
kubeContexts = viper.GetString(kubeContextsFlagKey.viperName())

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
mgr, err := ctrl.NewManager(intctrlutil.GeKubeRestConfig(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Expand Down
3 changes: 2 additions & 1 deletion controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package apps

import (
"context"
"math"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -171,7 +172,7 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Cluster{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 4)),
}).
Owns(&appsv1alpha1.Component{}).
Owns(&corev1.Service{}). // cluster services
Expand Down
5 changes: 5 additions & 0 deletions controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package apps

import (
"context"
"math"
"reflect"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -86,6 +88,9 @@ func (r *OpsRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *OpsRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.OpsRequest{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 2)),
}).
Watches(&appsv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.parseFirstOpsRequest)).
Watches(&workloadsv1alpha1.ReplicatedStateMachine{}, handler.EnqueueRequestsFromMapFunc(r.parseFirstOpsRequestForRSM)).
Watches(&dpv1alpha1.Backup{}, handler.EnqueueRequestsFromMapFunc(r.parseBackupOpsRequest)).
Expand Down
2 changes: 1 addition & 1 deletion controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.Backup{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(maxConcurDataProtectionReconKey),
MaxConcurrentReconciles: viper.GetInt(dptypes.CfgDataProtectionReconcileWorkers),
}).
Owns(&appsv1.StatefulSet{}).
Owns(&batchv1.Job{}).
Expand Down
4 changes: 2 additions & 2 deletions controllers/dataprotection/log_collection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package dataprotection
import (
"context"
"fmt"
"runtime"
"strings"

"github.com/spf13/viper"
"golang.org/x/exp/slices"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -107,7 +107,7 @@ func (r *LogCollectionReconciler) SetupWithManager(mgr ctrl.Manager) error {
r: r,
})).
WithOptions(controller.Options{
MaxConcurrentReconciles: runtime.NumCPU(),
MaxConcurrentReconciles: viper.GetInt(dptypes.CfgDataProtectionReconcileWorkers),
}).
Complete(r)
}
Expand Down
9 changes: 0 additions & 9 deletions controllers/dataprotection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package dataprotection

import (
"runtime"
"time"

corev1 "k8s.io/api/core/v1"

viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

const (
trueVal = "true"
)

const (
// settings keys
maxConcurDataProtectionReconKey = "MAXCONCURRENTRECONCILES_DATAPROTECTION"

// label keys
dataProtectionBackupRepoKey = "dataprotection.kubeblocks.io/backup-repo-name"
Expand Down Expand Up @@ -100,7 +95,3 @@ const (
)

var reconcileInterval = time.Second

func init() {
viper.SetDefault(maxConcurDataProtectionReconKey, runtime.NumCPU()*2)
}
10 changes: 10 additions & 0 deletions controllers/workloads/replicatedstatemachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
Expand Down Expand Up @@ -162,6 +163,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er

return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&workloads.ReplicatedStateMachine{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
}).
Watches(&appsv1.StatefulSet{}, stsHandler).
Watches(&batchv1.Job{}, jobHandler).
Watches(&corev1.Pod{}, podHandler).
Expand All @@ -176,6 +180,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er
podHandler := handler.NewBuilder(ctx).AddFinder(delegatorFinder).Build()
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&workloads.ReplicatedStateMachine{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
}).
Watches(&batchv1.Job{}, jobHandler).
Watches(&corev1.Pod{}, podHandler).
Complete(r)
Expand All @@ -186,6 +193,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er
podHandler := handler.NewBuilder(ctx).AddFinder(stsOwnerFinder).AddFinder(rsmOwnerFinder).Build()
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&workloads.ReplicatedStateMachine{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
}).
Owns(&appsv1.StatefulSet{}).
Owns(&batchv1.Job{}).
Watches(&corev1.Pod{}, podHandler).
Expand Down
12 changes: 12 additions & 0 deletions deploy/helm/templates/dataprotection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ spec:
- name: ENABLE_WEBHOOKS
value: "true"
{{- end }}
{{- if .Values.reconcileWorkers }}
- name: DATAPROTECTION_RECONCILE_WORKERS
value: {{ .Values.dataProtection.reconcileWorkers | quote }}
{{- end }}
{{- if .Values.client.qps }}
- name: CLIENT_QPS
value: {{ .Values.client.qps | quote }}
{{- end }}
{{- if .Values.client.burst }}
- name: CLIENT_BURST
value: {{ .Values.client.burst | quote }}
{{- end }}
- name: DP_ENCRYPTION_KEY
valueFrom:
secretKeyRef:
Expand Down
8 changes: 8 additions & 0 deletions deploy/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ spec:
- name: KUBEBLOCKS_RECONCILE_WORKERS
value: {{ .Values.reconcileWorkers | quote }}
{{- end }}
{{- if .Values.client.qps }}
- name: CLIENT_QPS
value: {{ .Values.client.qps | quote }}
{{- end }}
{{- if .Values.client.burst }}
- name: CLIENT_BURST
value: {{ .Values.client.burst | quote }}
{{- end }}
{{- with .Values.nodeSelector }}
- name: CM_NODE_SELECTOR
value: {{ toJson . | quote }}
Expand Down
12 changes: 10 additions & 2 deletions deploy/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ image:
##
replicaCount: 1

## MaxConcurrentReconciles for cluster and component controllers.
## MaxConcurrentReconciles for component, rsm and opsRequest controllers.
##
reconcileWorkers: ""

## k8s client configuration.
client:
# default is 20
qps: ""
# default is 30
burst: ""

## @param nameOverride
##
nameOverride: ""
Expand Down Expand Up @@ -318,7 +325,8 @@ dataProtection:
# if 'get/list' role of the backup CR are compromised.
encryptionKey: ""
gcFrequencySeconds: 3600

## MaxConcurrentReconciles for backup controller.
reconcileWorkers: ""
worker:
serviceAccount:
# The name of the service account for worker pods.
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
CfgKeyDPEncryptionKey = "DP_ENCRYPTION_KEY"

CfgKBReconcileWorkers = "KUBEBLOCKS_RECONCILE_WORKERS"
CfgClientQPS = "CLIENT_QPS"
CfgClientBurst = "CLIENT_BURST"
)

const (
Expand Down
17 changes: 17 additions & 0 deletions pkg/controllerutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

// GetUncachedObjects returns a list of K8s objects, for these object types,
Expand Down Expand Up @@ -140,3 +144,16 @@ func SetOwnerReference(owner, object metav1.Object) error {
func SetControllerReference(owner, object metav1.Object) error {
return controllerutil.SetControllerReference(owner, object, innerScheme)
}

func GeKubeRestConfig() *rest.Config {
cfg := ctrl.GetConfigOrDie()
clientQPS := viper.GetInt(constant.CfgClientQPS)
if clientQPS != 0 {
cfg.QPS = float32(clientQPS)
}
clientBurst := viper.GetInt(constant.CfgClientBurst)
if clientBurst != 0 {
cfg.Burst = clientBurst
}
return cfg
}
2 changes: 2 additions & 0 deletions pkg/dataprotection/types/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
CfgKeyWorkerServiceAccountAnnotations = "WORKER_SERVICE_ACCOUNT_ANNOTATIONS"
// CfgKeyWorkerClusterRoleName is the key of cluster role name for binding the service account of the worker
CfgKeyWorkerClusterRoleName = "WORKER_CLUSTER_ROLE_NAME"
// CfgDataProtectionReconcileWorkers the max reconcile workers for MaxConcurrentReconciles
CfgDataProtectionReconcileWorkers = "DATAPROTECTION_RECONCILE_WORKERS"
)

// config default values
Expand Down
4 changes: 4 additions & 0 deletions pkg/viperx/viperx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func GetInt(key string) int {
return rCall(key, viper.GetInt)
}

func GetFloat64(key string) float64 {
return rCall(key, viper.GetFloat64)
}

func GetInt32(key string) int32 {
return rCall(key, viper.GetInt32)
}
Expand Down

0 comments on commit bf5b01b

Please sign in to comment.