Skip to content

Commit

Permalink
fix: remove dependency of Reconfigure on StatefulSet (#6895)
Browse files Browse the repository at this point in the history
  • Loading branch information
free6om authored Mar 27, 2024
1 parent 2b22122 commit 135e94e
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 149 deletions.
4 changes: 2 additions & 2 deletions controllers/apps/configuration/combine_upgrade_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("Reconfigure CombineSyncPolicy", func() {
Expect(upgradePolicyMap[appsv1alpha1.DynamicReloadAndRestartPolicy]).ShouldNot(BeNil())

mockParam := newMockReconfigureParams("simplePolicy", k8sMockClient.Client(),
withMockStatefulSet(2, nil),
withMockRSM(2, nil),
withConfigSpec("for_test", map[string]string{
"key": "value",
}),
Expand All @@ -77,7 +77,7 @@ var _ = Describe("Reconfigure CombineSyncPolicy", func() {
}

mockParam := newMockReconfigureParams("simplePolicy", k8sMockClient.Client(),
withMockStatefulSet(2, nil),
withMockRSM(2, nil),
withConfigSpec("for_test", map[string]string{
"key": "value",
}),
Expand Down
21 changes: 2 additions & 19 deletions controllers/apps/configuration/config_reconcile_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package configuration

import (
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/controller/component"
configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration"
rsmcore "github.com/apecloud/kubeblocks/pkg/controller/rsm"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
)
Expand All @@ -40,10 +38,8 @@ type configReconcileContext struct {
ConfigMap *corev1.ConfigMap
BuiltinComponent *component.SynthesizedComponent

Containers []string
StatefulSets []appv1.StatefulSet
RSMList []workloads.ReplicatedStateMachine
Deployments []appv1.Deployment
Containers []string
RSMList []workloads.ReplicatedStateMachine

reqCtx intctrlutil.RequestCtx
}
Expand Down Expand Up @@ -81,19 +77,6 @@ func (c *configReconcileContext) RSM() *configReconcileContext {
client.ObjectKeyFromObject(c.ConfigMap),
client.InNamespace(c.Namespace),
c.MatchingLabels)
if err != nil {
return
}

// fix uid mismatch bug: convert rsm to sts
// NODE: all components use the StatefulSet
for _, rsm := range c.RSMList {
var stsObject appv1.StatefulSet
if err = c.Client.Get(c.Context, client.ObjectKeyFromObject(rsmcore.ConvertRSMToSTS(&rsm)), &stsObject); err != nil {
return
}
c.StatefulSets = append(c.StatefulSets, stsObject)
}
return
}
return c.Wrap(stsFn)
Expand Down
16 changes: 8 additions & 8 deletions controllers/apps/configuration/parallel_upgrade_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {
withGRPCClient(func(addr string) (cfgproto.ReconfigureClient, error) {
return reconfigureClient, nil
}),
withMockStatefulSet(3, nil),
withMockRSM(3, nil),
withClusterComponent(3),
withConfigSpec("for_test", map[string]string{
"a": "b",
Expand All @@ -79,7 +79,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {

k8sMockClient.MockListMethod(testutil.WithListReturned(
testutil.WithConstructListReturnedResult(fromPodObjectList(
newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], 3),
newMockPodsWithRSM(&mockParam.RSMUnits[0], 3),
))))

status, err := parallelPolicy.Upgrade(mockParam)
Expand All @@ -94,7 +94,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {
withGRPCClient(func(addr string) (cfgproto.ReconfigureClient, error) {
return reconfigureClient, nil
}),
withMockStatefulSet(3, nil),
withMockRSM(3, nil),
withClusterComponent(3),
withConfigSpec("for_test", map[string]string{
"a": "b",
Expand Down Expand Up @@ -134,7 +134,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {
withGRPCClient(func(addr string) (cfgproto.ReconfigureClient, error) {
return reconfigureClient, nil
}),
withMockStatefulSet(3, nil),
withMockRSM(3, nil),
withClusterComponent(3),
withConfigSpec("for_test", map[string]string{
"a": "b",
Expand All @@ -147,7 +147,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {

k8sMockClient.MockListMethod(testutil.WithListReturned(
testutil.WithConstructListReturnedResult(
fromPodObjectList(newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], 3))), testutil.WithTimes(2),
fromPodObjectList(newMockPodsWithRSM(&mockParam.RSMUnits[0], 3))), testutil.WithTimes(2),
))

status, err := parallelPolicy.Upgrade(mockParam)
Expand Down Expand Up @@ -176,7 +176,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {
withGRPCClient(func(addr string) (cfgproto.ReconfigureClient, error) {
return reconfigureClient, nil
}),
withMockStatefulSet(3, nil),
withMockRSM(3, nil),
withClusterComponent(3),
withConfigSpec("for_test", map[string]string{
"a": "b",
Expand All @@ -187,7 +187,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {
VolumeName: "test_volume",
}}}))

setPods := newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], 5)
setPods := newMockPodsWithRSM(&mockParam.RSMUnits[0], 5)
k8sMockClient.MockListMethod(testutil.WithListReturned(
testutil.WithConstructListReturnedResult(fromPodObjectList(setPods)), testutil.WithAnyTimes(),
))
Expand All @@ -204,7 +204,7 @@ var _ = Describe("Reconfigure ParallelPolicy", func() {
It("Should failed", func() {
// not support type
mockParam := newMockReconfigureParams("parallelPolicy", k8sMockClient.Client(),
withMockStatefulSet(2, nil),
withMockRSM(2, nil),
withConfigSpec("for_test", map[string]string{
"key": "value",
}),
Expand Down
10 changes: 5 additions & 5 deletions controllers/apps/configuration/policy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
// GetComponentPods gets all pods of the component.
func GetComponentPods(params reconfigureParams) ([]corev1.Pod, error) {
componentPods := make([]corev1.Pod, 0)
for i := range params.ComponentUnits {
pods, err := intctrlutil.GetPodListByStatefulSet(params.Ctx.Ctx, params.Client, &params.ComponentUnits[i])
for i := range params.RSMUnits {
pods, err := intctrlutil.GetPodListByRSM(params.Ctx.Ctx, params.Client, &params.RSMUnits[i])
if err != nil {
return nil, err
}
Expand All @@ -72,11 +72,11 @@ func CheckReconfigureUpdateProgress(pods []corev1.Pod, configKey, version string
}

func getPodsForOnlineUpdate(params reconfigureParams) ([]corev1.Pod, error) {
if len(params.ComponentUnits) > 1 {
return nil, core.MakeError("component require only one statefulSet, actual %d components", len(params.ComponentUnits))
if len(params.RSMUnits) > 1 {
return nil, core.MakeError("component require only one rsm, actual %d components", len(params.RSMUnits))
}

if len(params.ComponentUnits) == 0 {
if len(params.RSMUnits) == 0 {
return nil, nil
}

Expand Down
81 changes: 11 additions & 70 deletions controllers/apps/configuration/policy_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,53 +46,20 @@ var (
stsSchemaKind = appsv1.SchemeGroupVersion.WithKind("StatefulSet")
)

func newMockDeployments(replicas int, name string, labels map[string]string) appsv1.Deployment {
uid, _ := password.Generate(12, 12, 0, true, false)
return appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
UID: types.UID(uid),
},
Spec: appsv1.DeploymentSpec{
Replicas: func() *int32 { i := int32(replicas); return &i }(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{},
Volumes: []corev1.Volume{{
Name: "for_test",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/tmp",
},
}}},
},
},
},
}
}

func newMockStatefulSet(replicas int, name string, labels map[string]string) appsv1.StatefulSet {
func newMockRSM(replicas int, name string, labels map[string]string) workloads.ReplicatedStateMachine {
uid, _ := password.Generate(12, 12, 0, true, false)
serviceName, _ := password.Generate(12, 0, 0, true, false)
return appsv1.StatefulSet{
return workloads.ReplicatedStateMachine{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1",
Kind: "ReplicatedStateMachine",
APIVersion: "workloads.kubeblocks.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
UID: types.UID(uid),
},
Spec: appsv1.StatefulSetSpec{
Spec: workloads.ReplicatedStateMachineSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Expand All @@ -119,22 +86,12 @@ func newMockStatefulSet(replicas int, name string, labels map[string]string) app

type ParamsOps func(params *reconfigureParams)

func withMockStatefulSet(replicas int, labels map[string]string) ParamsOps {
func withMockRSM(replicas int, labels map[string]string) ParamsOps {
return func(params *reconfigureParams) {
rand, _ := password.Generate(12, 8, 0, true, false)
stsName := "test_" + rand
params.ComponentUnits = []appsv1.StatefulSet{
newMockStatefulSet(replicas, stsName, labels),
}
}
}

func withMockDeployments(replicas int, labels map[string]string) ParamsOps {
return func(params *reconfigureParams) {
rand, _ := password.Generate(12, 8, 0, true, false)
deployName := "test_" + rand
params.DeploymentUnits = []appsv1.Deployment{
newMockDeployments(replicas, deployName, labels),
params.RSMUnits = []workloads.ReplicatedStateMachine{
newMockRSM(replicas, stsName, labels),
}
}
}
Expand Down Expand Up @@ -262,27 +219,11 @@ func newMockReconfigureParams(testName string, cli client.Client, paramOps ...Pa
return params
}

func newMockPodsWithStatefulSet(sts *appsv1.StatefulSet, replicas int, options ...PodOptions) []corev1.Pod {
pods := make([]corev1.Pod, replicas)
for i := 0; i < replicas; i++ {
pods[i] = newMockPod(sts.Name+"-"+fmt.Sprint(i), &sts.Spec.Template.Spec)
pods[i].OwnerReferences = []metav1.OwnerReference{newControllerRef(sts, stsSchemaKind)}
pods[i].Status.PodIP = "1.1.1.1"
}
for _, customFn := range options {
for i := range pods {
pod := &pods[i]
customFn(pod, i)
}
}
return pods
}

func newMockPodsWithDeployment(deploy *appsv1.Deployment, replicas int, options ...PodOptions) []corev1.Pod {
func newMockPodsWithRSM(rsm *workloads.ReplicatedStateMachine, replicas int, options ...PodOptions) []corev1.Pod {
pods := make([]corev1.Pod, replicas)
for i := 0; i < replicas; i++ {
pods[i] = newMockPod(deploy.Name+"-"+fmt.Sprint(i), &deploy.Spec.Template.Spec)
pods[i].OwnerReferences = []metav1.OwnerReference{newControllerRef(deploy, stsSchemaKind)}
pods[i] = newMockPod(rsm.Name+"-"+fmt.Sprint(i), &rsm.Spec.Template.Spec)
pods[i].OwnerReferences = []metav1.OwnerReference{newControllerRef(rsm, stsSchemaKind)}
pods[i].Status.PodIP = "1.1.1.1"
}
for _, customFn := range options {
Expand Down
6 changes: 2 additions & 4 deletions controllers/apps/configuration/reconfigure_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (r *ReconfigureReconciler) sync(reqCtx intctrlutil.RequestCtx, configMap *c
return intctrlutil.Reconciled()
}

if len(reconcileContext.StatefulSets) == 0 && len(reconcileContext.RSMList) == 0 {
if len(reconcileContext.RSMList) == 0 {
reqCtx.Recorder.Event(configMap, corev1.EventTypeWarning, appsv1alpha1.ReasonReconfigureFailed,
"the configmap is not used by any container, skip reconfigure")
return updateConfigPhase(r.Client, reqCtx, configMap, appsv1alpha1.CFinishedPhase, configurationNotUsingMessage)
Expand All @@ -202,9 +202,7 @@ func (r *ReconfigureReconciler) sync(reqCtx intctrlutil.RequestCtx, configMap *c
Ctx: reqCtx,
Cluster: reconcileContext.ClusterObj,
ContainerNames: reconcileContext.Containers,
ComponentUnits: reconcileContext.StatefulSets,
DeploymentUnits: reconcileContext.Deployments,
RSMList: reconcileContext.RSMList,
RSMUnits: reconcileContext.RSMList,
ClusterComponent: reconcileContext.ClusterComObj,
SynthesizedComponent: reconcileContext.BuiltinComponent,
Restart: forceRestart || !cfgcm.IsSupportReload(resources.configConstraintObj.Spec.ReloadOptions),
Expand Down
21 changes: 3 additions & 18 deletions controllers/apps/configuration/reconfigure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -111,12 +110,8 @@ type reconfigureParams struct {
// TODO(xingran): remove this field when test case is refactored.
Component *appsv1alpha1.ClusterComponentDefinition

// List of StatefulSets using this config template.
ComponentUnits []appsv1.StatefulSet
// List of Deployment using this config template.
DeploymentUnits []appsv1.Deployment
// List of ReplicatedStateMachine using this config template.
RSMList []workloads.ReplicatedStateMachine
RSMUnits []workloads.ReplicatedStateMachine
}

var (
Expand Down Expand Up @@ -168,7 +163,7 @@ func (param *reconfigureParams) maxRollingReplicas() int32 {
}

var maxUnavailable *intstr.IntOrString
for _, rsm := range param.RSMList {
for _, rsm := range param.RSMUnits {
if rsm.Spec.UpdateStrategy.RollingUpdate != nil {
maxUnavailable = rsm.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable
}
Expand Down Expand Up @@ -302,17 +297,7 @@ func makeReturnedStatus(status ExecStatus, ops ...func(status *ReturnedStatus))

func fromWorkloadObjects(params reconfigureParams) []client.Object {
r := make([]client.Object, 0)
for _, unit := range params.RSMList {
r = append(r, &unit)
}
// migrated workload
if len(r) != 0 {
return r
}
for _, unit := range params.ComponentUnits {
r = append(r, &unit)
}
for _, unit := range params.DeploymentUnits {
for _, unit := range params.RSMUnits {
r = append(r, &unit)
}
return r
Expand Down
10 changes: 5 additions & 5 deletions controllers/apps/configuration/rolling_upgrade_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Reconfigure RollingPolicy", func() {

createReconfigureParam := func(compType appsv1alpha1.WorkloadType, replicas int) reconfigureParams {
return newMockReconfigureParams("rollingPolicy", k8sMockClient.Client(),
withMockStatefulSet(replicas, nil),
withMockRSM(replicas, nil),
withConfigSpec("for_test", map[string]string{
"key": "value",
}),
Expand Down Expand Up @@ -105,10 +105,10 @@ var _ = Describe("Reconfigure RollingPolicy", func() {

acc := 0
mockPods := [][]corev1.Pod{
newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], 2),
newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], 5,
newMockPodsWithRSM(&mockParam.RSMUnits[0], 2),
newMockPodsWithRSM(&mockParam.RSMUnits[0], 5,
mockLeaderLabel),
newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], 3,
newMockPodsWithRSM(&mockParam.RSMUnits[0], 3,
withReadyPod(0, 0),
withAvailablePod(0, 3),
mockLeaderLabel),
Expand Down Expand Up @@ -184,7 +184,7 @@ var _ = Describe("Reconfigure RollingPolicy", func() {
},
},
}
pods = newMockPodsWithStatefulSet(&mockParam.ComponentUnits[0], defaultReplica)
pods = newMockPodsWithRSM(&mockParam.RSMUnits[0], defaultReplica)
}

k8sMockClient.MockListMethod(testutil.WithListReturned(
Expand Down
Loading

0 comments on commit 135e94e

Please sign in to comment.