From 38b124376acd25e8f4f11bfaeeedc9a0b1e1d976 Mon Sep 17 00:00:00 2001 From: kizuna-lek Date: Fri, 13 Dec 2024 15:18:01 +0800 Subject: [PATCH] fix: sharding reconfigure --- apis/apps/v1alpha1/opsrequest_webhook.go | 62 ++++++++++++++++++---- controllers/apps/operations/reconfigure.go | 32 ++++++++++- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/apis/apps/v1alpha1/opsrequest_webhook.go b/apis/apps/v1alpha1/opsrequest_webhook.go index fef01222300..f4a765007c8 100644 --- a/apis/apps/v1alpha1/opsrequest_webhook.go +++ b/apis/apps/v1alpha1/opsrequest_webhook.go @@ -190,6 +190,20 @@ func (r *OpsRequest) getConfigMap(ctx context.Context, return cmObj, nil } +func (r *OpsRequest) getShardingComponents(ctx context.Context, + k8sClient client.Client, + cluster *Cluster, shardingName string) ([]Component, error) { + compList := &ComponentList{} + ml := client.MatchingLabels{ + constant.AppInstanceLabelKey: cluster.Name, + constant.KBAppShardingNameLabelKey: shardingName, + } + if err := k8sClient.List(ctx, compList, client.InNamespace(cluster.Namespace), ml); err != nil { + return nil, err + } + return compList.Items, nil +} + // Validate validates OpsRequest func (r *OpsRequest) Validate(ctx context.Context, k8sClient client.Client, @@ -375,24 +389,54 @@ func (r *OpsRequest) validateReconfigureParams(ctx context.Context, k8sClient client.Client, cluster *Cluster, reconfigure *Reconfigure) error { - if cluster.Spec.GetComponentByName(reconfigure.ComponentName) == nil { + if cluster.Spec.GetComponentByName(reconfigure.ComponentName) == nil && + cluster.Spec.GetShardingByName(reconfigure.ComponentName) == nil { return fmt.Errorf("component %s not found", reconfigure.ComponentName) } - for _, configuration := range reconfigure.Configurations { - cmObj, err := r.getConfigMap(ctx, k8sClient, fmt.Sprintf("%s-%s-%s", r.Spec.GetClusterName(), reconfigure.ComponentName, configuration.Name)) + + shardingNameMap := map[string][]Component{} + for _, shardingSpec := range cluster.Spec.ShardingSpecs { + shardingComponents, err := r.getShardingComponents(ctx, k8sClient, cluster, shardingSpec.Name) if err != nil { return err } - for _, key := range configuration.Keys { - // check add file - if _, ok := cmObj.Data[key.Key]; !ok && key.FileContent == "" { - return errors.Errorf("key %s not found in configmap %s", key.Key, configuration.Name) + shardingNameMap[shardingSpec.Name] = shardingComponents + } + + validateConfiguration := func(reconfigure *Reconfigure) error { + for _, configuration := range reconfigure.Configurations { + cmObj, err := r.getConfigMap(ctx, k8sClient, fmt.Sprintf("%s-%s-%s", r.Spec.GetClusterName(), reconfigure.ComponentName, configuration.Name)) + if err != nil { + return err } - if key.FileContent == "" && len(key.Parameters) == 0 { - return errors.New("key.fileContent and key.parameters cannot be empty at the same time") + for _, key := range configuration.Keys { + // check add file + if _, ok := cmObj.Data[key.Key]; !ok && key.FileContent == "" { + return errors.Errorf("key %s not found in configmap %s", key.Key, configuration.Name) + } + if key.FileContent == "" && len(key.Parameters) == 0 { + return errors.New("key.fileContent and key.parameters cannot be empty at the same time") + } } } + return nil } + + if _, ok := shardingNameMap[reconfigure.ComponentName]; ok { + for _, shardingComponents := range shardingNameMap[reconfigure.ComponentName] { + if err := validateConfiguration(&Reconfigure{ + ComponentOps: ComponentOps{ + ComponentName: shardingComponents.Labels[constant.KBAppComponentLabelKey], + }, + Configurations: reconfigure.Configurations, + }); err != nil { + return err + } + } + } else { + return validateConfiguration(reconfigure) + } + return nil } diff --git a/controllers/apps/operations/reconfigure.go b/controllers/apps/operations/reconfigure.go index 6dea9ce5a1b..083be121710 100644 --- a/controllers/apps/operations/reconfigure.go +++ b/controllers/apps/operations/reconfigure.go @@ -30,6 +30,7 @@ import ( appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" "github.com/apecloud/kubeblocks/pkg/configuration/core" + "github.com/apecloud/kubeblocks/pkg/constant" configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) @@ -142,10 +143,37 @@ func (r *reconfigureAction) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli c func fromReconfigureOperations(request appsv1alpha1.OpsRequestSpec, reqCtx intctrlutil.RequestCtx, cli client.Client, resource *OpsResource) (reconfigures []reconfigureParams) { var operations []appsv1alpha1.Reconfigure + shardingNameMap := map[string][]appsv1alpha1.Component{} + for _, shardingSpec := range resource.Cluster.Spec.ShardingSpecs { + components, err := intctrlutil.ListShardingComponents(reqCtx.Ctx, cli, resource.Cluster, shardingSpec.Name) + if err != nil { + continue + } + shardingNameMap[shardingSpec.Name] = components + } + + appendReconfigure := func(reconfigures ...appsv1alpha1.Reconfigure) { + for _, reconfigure := range reconfigures { + // Perform the same reconfigure operation on all shards + if _, ok := shardingNameMap[reconfigure.ComponentName]; ok { + for _, shardingComponents := range shardingNameMap[reconfigure.ComponentName] { + operations = append(operations, appsv1alpha1.Reconfigure{ + ComponentOps: appsv1alpha1.ComponentOps{ + ComponentName: shardingComponents.Labels[constant.KBAppComponentLabelKey], + }, + Configurations: reconfigure.Configurations, + }) + } + } else { + operations = append(operations, *request.Reconfigure) + } + } + } + if request.Reconfigure != nil { - operations = append(operations, *request.Reconfigure) + appendReconfigure(*request.Reconfigure) } - operations = append(operations, request.Reconfigures...) + appendReconfigure(request.Reconfigures...) for _, reconfigure := range operations { if len(reconfigure.Configurations) == 0 {