Skip to content

Commit

Permalink
Add statefulset Partition controller
Browse files Browse the repository at this point in the history
Signed-off-by: d-kuro <[email protected]>
  • Loading branch information
d-kuro committed Jul 3, 2024
1 parent 1ed88f7 commit f665979
Show file tree
Hide file tree
Showing 23 changed files with 1,430 additions and 0 deletions.
99 changes: 99 additions & 0 deletions api/v1beta2/statefulset_webhhok.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package v1beta2

import (
"context"
"fmt"

"github.com/cybozu-go/moco/pkg/constants"
admissionv1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

func SetupStatefulSetWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(&appsv1.StatefulSet{}).
WithDefaulter(&StatefulSetDefaulter{}).
Complete()
}

//+kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps,resources=statefulsets,verbs=update,versions=v1,name=statefulset.kb.io,admissionReviewVersions=v1

type StatefulSetDefaulter struct{}

var _ admission.CustomDefaulter = &StatefulSetDefaulter{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (*StatefulSetDefaulter) Default(ctx context.Context, obj runtime.Object) error {
sts, ok := obj.(*appsv1.StatefulSet)
if !ok {
return fmt.Errorf("unknown obj type %T", obj)
}

req, err := admission.RequestFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to get admission request from context: %w", err)
}

if req.Operation != admissionv1.Update {
return nil
}

if len(sts.OwnerReferences) != 1 {
return nil
}

if sts.OwnerReferences[0].Kind != "MySQLCluster" && sts.OwnerReferences[0].APIVersion != GroupVersion.String() {
return nil
}

if sts.Annotations[constants.AnnForceRollingUpdate] == "true" {
sts.Spec.UpdateStrategy.RollingUpdate = nil
return nil
}

if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
Partition: ptr.To[int32](*sts.Spec.Replicas),
}
return nil
}

oldSts, err := readStatefulSet(req.OldObject.Raw)
if err != nil {
return fmt.Errorf("failed to read old statefulset: %w", err)
}

partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
oldPartition := *oldSts.Spec.UpdateStrategy.RollingUpdate.Partition

newSts := sts.DeepCopy()
newSts.Spec.UpdateStrategy = oldSts.Spec.UpdateStrategy

if partition != oldPartition && equality.Semantic.DeepEqual(newSts.Spec, oldSts.Spec) {
return nil
}

sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
Partition: ptr.To[int32](*sts.Spec.Replicas),
}

return nil
}

func readStatefulSet(raw []byte) (*appsv1.StatefulSet, error) {
var sts appsv1.StatefulSet

if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, &sts); err != nil {
return nil, err
}

sts.TypeMeta.APIVersion = appsv1.SchemeGroupVersion.Group + "/" + appsv1.SchemeGroupVersion.Version

return &sts, nil
}
170 changes: 170 additions & 0 deletions api/v1beta2/statefulset_webhhok_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package v1beta2_test

import (
"context"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/cybozu-go/moco/pkg/constants"
)

func makeStatefulSet() *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "moco.cybozu.com/v1beta2",
Kind: "MySQLCluster",
Name: "test",
UID: "uid",
},
},
},
Spec: appsv1.StatefulSetSpec{
Replicas: ptr.To[int32](3),
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "mysql",
Image: "mysql:examle",
},
},
},
},
},
}
}

func deleteStatefulSet() error {
r := &appsv1.StatefulSet{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "test"}, r)
if apierrors.IsNotFound(err) {
return nil
}

if err != nil {
return err
}

r.Finalizers = nil
if err := k8sClient.Update(ctx, r); err != nil {
return err
}

if err := k8sClient.Delete(ctx, r); err != nil {
return err
}

return nil
}

var _ = Describe("MySQLCluster Webhook", func() {
ctx := context.TODO()

BeforeEach(func() {
err := deleteStatefulSet()
Expect(err).NotTo(HaveOccurred())
})

It("should not set partition when creating StatefulSet", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate).To(BeNil())
})

It("should set partition when updating StatefulSet", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())

err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
})

It("should not set partition when forcing updating StatefulSet", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())

r.Annotations = map[string]string{constants.AnnForceRollingUpdate: "true"}
err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate).To(BeNil())
})

It("should set partition when forcing updating StatefulSet with invalid value", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())

r.Annotations = map[string]string{constants.AnnForceRollingUpdate: "false"}
err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
})

It("should not update partition when updating StatefulSet with only partition changed", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())

err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))

r.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To[int32](2)
err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](2)))
})

It("should update partition when updating StatefulSet with partition and same field changed", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())

err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))

r.Spec.Replicas = ptr.To[int32](5)
r.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To[int32](2)
err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](5)))
})

It("should update partition when updating StatefulSet with partition unchanged", func() {
r := makeStatefulSet()
err := k8sClient.Create(ctx, r)
Expect(err).NotTo(HaveOccurred())

err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))

r.Spec.Replicas = ptr.To[int32](5)
err = k8sClient.Update(ctx, r)
Expect(err).NotTo(HaveOccurred())
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](5)))
})
})
2 changes: 2 additions & 0 deletions api/v1beta2/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
err = (&mocov1beta2.BackupPolicy{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
err = mocov1beta2.SetupStatefulSetWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())

//+kubebuilder:scaffold:webhook

Expand Down
15 changes: 15 additions & 0 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions charts/moco/templates/generated/generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/status
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down Expand Up @@ -460,6 +466,25 @@ webhooks:
resources:
- mysqlclusters
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: moco-webhook-service
namespace: '{{ .Release.Namespace }}'
path: /mutate-apps-v1-statefulset
failurePolicy: Fail
name: statefulset.kb.io
rules:
- apiGroups:
- apps
apiVersions:
- v1
operations:
- UPDATE
resources:
- statefulsets
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
14 changes: 14 additions & 0 deletions cmd/moco-controller/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func subMain(ns, addr string, port int) error {
return err
}

if err = (&controllers.StatefulSetPartitionReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("moco-controller"),
MaxConcurrentReconciles: config.maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Partition")
return err
}

if err = (&controllers.PodWatcher{
Client: mgr.GetClient(),
ClusterManager: clusterMgr,
Expand All @@ -138,6 +147,11 @@ func subMain(ns, addr string, port int) error {
return err
}

if err := mocov1beta2.SetupStatefulSetWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to setup webhook", "webhook", "StatefulSet")
return err
}

if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
return err
Expand Down
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/status
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
Loading

0 comments on commit f665979

Please sign in to comment.