Skip to content

Commit

Permalink
chore: switchover before updating or deleting pods
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Dec 31, 2024
1 parent d1b1007 commit 79187b1
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/component/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
)

Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/transformer_component_post_provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"time"

"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/component/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/transformer_component_pre_terminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/component/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import (
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/component/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/factory"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/instanceset/reconciler_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package instanceset

import (
"errors"
"fmt"
"time"

Expand All @@ -32,6 +33,7 @@ import (

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/model"
)

Expand Down Expand Up @@ -95,6 +97,7 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
// 3. do update
// do nothing if UpdateStrategyType is 'OnDelete'
if its.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
// TODO: how to handle the OnDelete type?
return kubebuilderx.Continue, nil
}

Expand Down Expand Up @@ -176,12 +179,18 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
return kubebuilderx.Continue, err
}
newPod := copyAndMerge(pod, newInstance.pod)
if err = r.switchover(tree, its, newPod.(*corev1.Pod)); err != nil {
return kubebuilderx.Continue, err
}
if err = tree.Update(newPod); err != nil {
return kubebuilderx.Continue, err
}
updatingPods++
} else if updatePolicy == RecreatePolicy {
if !isTerminating(pod) {
if err = r.switchover(tree, its, pod); err != nil {
return kubebuilderx.Continue, err
}
if err = tree.Delete(pod); err != nil {
return kubebuilderx.Continue, err
}
Expand All @@ -199,6 +208,23 @@ func (r *updateReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder
return kubebuilderx.Continue, nil
}

func (r *updateReconciler) switchover(tree *kubebuilderx.ObjectTree, its *workloads.InstanceSet, pod *corev1.Pod) error {
// TODO: lifecycle actions and template vars
lfa, err := lifecycle.New2(its.Namespace, its.Name, its.Name, nil, nil, pod)
if err != nil {
return err
}
err = lfa.Switchover(tree.Context, nil, nil, "")
if err != nil {
if errors.Is(err, lifecycle.ErrActionNotDefined) {
return nil
}
return err
}
tree.Logger.Info("successfully call switchover action for pod", "pod", pod.Name)
return nil
}

func buildBlockedCondition(its *workloads.InstanceSet, message string) *metav1.Condition {
return &metav1.Condition{
Type: string(workloads.InstanceUpdateRestricted),
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/instanceset/tree_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (r *treeLoader) Load(ctx context.Context, reader client.Reader, req ctrl.Re
return nil, err
}

tree.Context = ctx
tree.EventRecorder = recorder
tree.Logger = logger
tree.SetFinalizer(finalizer)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/kubebuilderx/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

type ObjectTree struct {
// TODO(free6om): should find a better place to hold these two params?
context.Context
record.EventRecorder
logr.Logger

Expand Down Expand Up @@ -113,6 +114,7 @@ func (t *ObjectTree) DeepCopy() (*ObjectTree, error) {
}
out.children = children
out.finalizer = t.finalizer
out.Context = t.Context
out.EventRecorder = t.EventRecorder
out.Logger = t.Logger
return out, nil
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
kbagt "github.com/apecloud/kubeblocks/pkg/kbagent"
Expand All @@ -47,68 +46,72 @@ type lifecycleAction interface {
}

type kbagent struct {
synthesizedComp *component.SynthesizedComponent
pods []*corev1.Pod
pod *corev1.Pod
namespace string
clusterName string
compName string
lifecycleActions *appsv1.ComponentLifecycleActions
templateVars map[string]any
pods []*corev1.Pod
pod *corev1.Pod
}

var _ Lifecycle = &kbagent{}

func (a *kbagent) PostProvision(ctx context.Context, cli client.Reader, opts *Options) error {
lfa := &postProvision{
namespace: a.synthesizedComp.Namespace,
clusterName: a.synthesizedComp.ClusterName,
compName: a.synthesizedComp.Name,
action: a.synthesizedComp.LifecycleActions.PostProvision,
namespace: a.namespace,
clusterName: a.clusterName,
compName: a.compName,
action: a.lifecycleActions.PostProvision,
}
return a.ignoreOutput(a.checkedCallAction(ctx, cli, lfa.action, lfa, opts))
}

func (a *kbagent) PreTerminate(ctx context.Context, cli client.Reader, opts *Options) error {
lfa := &preTerminate{
namespace: a.synthesizedComp.Namespace,
clusterName: a.synthesizedComp.ClusterName,
compName: a.synthesizedComp.Name,
action: a.synthesizedComp.LifecycleActions.PreTerminate,
namespace: a.namespace,
clusterName: a.clusterName,
compName: a.compName,
action: a.lifecycleActions.PreTerminate,
}
return a.ignoreOutput(a.checkedCallAction(ctx, cli, lfa.action, lfa, opts))
}

func (a *kbagent) RoleProbe(ctx context.Context, cli client.Reader, opts *Options) ([]byte, error) {
return a.checkedCallProbe(ctx, cli, a.synthesizedComp.LifecycleActions.RoleProbe, &roleProbe{}, opts)
return a.checkedCallProbe(ctx, cli, a.lifecycleActions.RoleProbe, &roleProbe{}, opts)
}

func (a *kbagent) Switchover(ctx context.Context, cli client.Reader, opts *Options, candidate string) error {
roleName := a.pod.Labels[constant.RoleLabelKey]
lfa := &switchover{
namespace: a.synthesizedComp.Namespace,
clusterName: a.synthesizedComp.ClusterName,
compName: a.synthesizedComp.Name,
namespace: a.namespace,
clusterName: a.clusterName,
compName: a.compName,
role: roleName,
currentPod: a.pod.Name,
candidatePod: candidate,
}
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.Switchover, lfa, opts))
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.lifecycleActions.Switchover, lfa, opts))
}

func (a *kbagent) MemberJoin(ctx context.Context, cli client.Reader, opts *Options) error {
lfa := &memberJoin{
namespace: a.synthesizedComp.Namespace,
clusterName: a.synthesizedComp.ClusterName,
compName: a.synthesizedComp.Name,
namespace: a.namespace,
clusterName: a.clusterName,
compName: a.compName,
pod: a.pod,
}
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.MemberJoin, lfa, opts))
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.lifecycleActions.MemberJoin, lfa, opts))
}

func (a *kbagent) MemberLeave(ctx context.Context, cli client.Reader, opts *Options) error {
lfa := &memberLeave{
namespace: a.synthesizedComp.Namespace,
clusterName: a.synthesizedComp.ClusterName,
compName: a.synthesizedComp.Name,
namespace: a.namespace,
clusterName: a.clusterName,
compName: a.compName,
pod: a.pod,
}
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.MemberLeave, lfa, opts))
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.lifecycleActions.MemberLeave, lfa, opts))
}

func (a *kbagent) AccountProvision(ctx context.Context, cli client.Reader, opts *Options, statement, user, password string) error {
Expand All @@ -117,7 +120,7 @@ func (a *kbagent) AccountProvision(ctx context.Context, cli client.Reader, opts
user: user,
password: password,
}
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.synthesizedComp.LifecycleActions.AccountProvision, lfa, opts))
return a.ignoreOutput(a.checkedCallAction(ctx, cli, a.lifecycleActions.AccountProvision, lfa, opts))
}

func (a *kbagent) ignoreOutput(_ []byte, err error) error {
Expand Down Expand Up @@ -165,20 +168,20 @@ func (a *kbagent) clusterReadyCheck(ctx context.Context, cli client.Reader) erro
cluster := object.(*appsv1.Cluster)
return cluster.Status.Phase == appsv1.RunningClusterPhase
}
return a.readyCheck(ctx, cli, a.synthesizedComp.ClusterName, "cluster", &appsv1.Cluster{}, ready)
return a.readyCheck(ctx, cli, a.clusterName, "cluster", &appsv1.Cluster{}, ready)
}

func (a *kbagent) compReadyCheck(ctx context.Context, cli client.Reader) error {
ready := func(object client.Object) bool {
comp := object.(*appsv1.Component)
return comp.Status.Phase == appsv1.RunningComponentPhase
}
compName := constant.GenerateClusterComponentName(a.synthesizedComp.ClusterName, a.synthesizedComp.Name)
compName := constant.GenerateClusterComponentName(a.clusterName, a.compName)
return a.readyCheck(ctx, cli, compName, "component", &appsv1.Component{}, ready)
}

func (a *kbagent) runtimeReadyCheck(ctx context.Context, cli client.Reader) error {
name := constant.GenerateWorkloadNamePattern(a.synthesizedComp.ClusterName, a.synthesizedComp.Name)
name := constant.GenerateWorkloadNamePattern(a.clusterName, a.compName)
ready := func(object client.Object) bool {
its := object.(*workloads.InstanceSet)
return instanceset.IsInstancesReady(its)
Expand All @@ -188,7 +191,7 @@ func (a *kbagent) runtimeReadyCheck(ctx context.Context, cli client.Reader) erro

func (a *kbagent) readyCheck(ctx context.Context, cli client.Reader, name, kind string, obj client.Object, ready func(object client.Object) bool) error {
key := types.NamespacedName{
Namespace: a.synthesizedComp.Namespace,
Namespace: a.namespace,
Name: name,
}
if err := cli.Get(ctx, key, obj); err != nil {
Expand Down Expand Up @@ -255,7 +258,7 @@ func (a *kbagent) parameters(ctx context.Context, cli client.Reader, lfa lifecyc

func (a *kbagent) templateVarsParameters() (map[string]string, error) {
m := map[string]string{}
for k, v := range a.synthesizedComp.TemplateVars {
for k, v := range a.templateVars {
m[k] = v.(string)
}
return m, nil
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ type Lifecycle interface {
}

func New(synthesizedComp *component.SynthesizedComponent, pod *corev1.Pod, pods ...*corev1.Pod) (Lifecycle, error) {
return New2(synthesizedComp.Namespace, synthesizedComp.ClusterName, synthesizedComp.Name,
synthesizedComp.LifecycleActions, synthesizedComp.TemplateVars, pod, pods...)
}

func New2(namespace, clusterName, compName string, lifecycleActions *appsv1.ComponentLifecycleActions,
templateVars map[string]any, pod *corev1.Pod, pods ...*corev1.Pod) (Lifecycle, error) {
if pod == nil && len(pods) == 0 {
return nil, fmt.Errorf("either pod or pods must be provided to call lifecycle actions")
}
Expand All @@ -69,8 +75,12 @@ func New(synthesizedComp *component.SynthesizedComponent, pod *corev1.Pod, pods
pods = []*corev1.Pod{pod}
}
return &kbagent{
synthesizedComp: synthesizedComp,
pods: pods,
pod: pod,
namespace: namespace,
clusterName: clusterName,
compName: compName,
lifecycleActions: lifecycleActions,
templateVars: templateVars,
pods: pods,
pod: pod,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ var _ = Describe("lifecycle", func() {

Expect(lifecycle).ShouldNot(BeNil())
agent := lifecycle.(*kbagent)
Expect(agent.synthesizedComp).Should(Equal(synthesizedComp))
Expect(agent.namespace).Should(Equal(synthesizedComp.Namespace))
Expect(agent.clusterName).Should(Equal(synthesizedComp.ClusterName))
Expect(agent.compName).Should(Equal(synthesizedComp.Name))
Expect(agent.lifecycleActions).Should(Equal(synthesizedComp.LifecycleActions))
Expect(agent.pod).Should(Equal(pod))
Expect(agent.pods).Should(HaveLen(1))
Expect(agent.pods[0]).Should(Equal(pod))
Expand All @@ -184,7 +187,10 @@ var _ = Describe("lifecycle", func() {

Expect(lifecycle).ShouldNot(BeNil())
agent := lifecycle.(*kbagent)
Expect(agent.synthesizedComp).Should(Equal(synthesizedComp))
Expect(agent.namespace).Should(Equal(synthesizedComp.Namespace))
Expect(agent.clusterName).Should(Equal(synthesizedComp.ClusterName))
Expect(agent.compName).Should(Equal(synthesizedComp.Name))
Expect(agent.lifecycleActions).Should(Equal(synthesizedComp.LifecycleActions))
Expect(agent.pod).Should(Equal(pod))
Expect(agent.pods).Should(HaveLen(1))
Expect(agent.pods[0]).Should(Equal(pod))
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/operations/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
opsv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/component/lifecycle"
"github.com/apecloud/kubeblocks/pkg/controller/lifecycle"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

Expand Down

0 comments on commit 79187b1

Please sign in to comment.