From 08421b381133805860f4d55a18fb2e92fb17b725 Mon Sep 17 00:00:00 2001 From: d-kuro Date: Wed, 10 Jan 2024 14:11:54 +0900 Subject: [PATCH] WIP --- .../moco/templates/generated/generated.yaml | 6 + config/rbac/role.yaml | 6 + controllers/partition_controller.go | 213 ++++++++++++++++++ pkg/constants/meta.go | 1 + 4 files changed, 226 insertions(+) create mode 100644 controllers/partition_controller.go diff --git a/charts/moco/templates/generated/generated.yaml b/charts/moco/templates/generated/generated.yaml index 1332da753..c890dfcb5 100644 --- a/charts/moco/templates/generated/generated.yaml +++ b/charts/moco/templates/generated/generated.yaml @@ -155,6 +155,12 @@ rules: - patch - update - watch + - apiGroups: + - "" + resources: + - pods/status + verbs: + - get - apiGroups: - "" resources: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 753de4d32..52b5ee731 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -50,6 +50,12 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - pods/status + verbs: + - get - apiGroups: - "" resources: diff --git a/controllers/partition_controller.go b/controllers/partition_controller.go new file mode 100644 index 000000000..965248af7 --- /dev/null +++ b/controllers/partition_controller.go @@ -0,0 +1,213 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + "github.com/cybozu-go/moco/pkg/constants" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + crlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var _ reconcile.Reconciler = &StatefulSetPartitionReconciler{} + +// StatefulSetPartitionReconciler reconciles a StatefulSet object +type StatefulSetPartitionReconciler struct { + client.Client + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=moco.cybozu.com,resources=mysqlclusters,verbs=get;list;watch +//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +//+kubebuilder:rbac:groups="",resources=pods/status,verbs=get +//+kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch + +// Reconcile implements Reconciler interface. +// See https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile#Reconciler +func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := crlog.FromContext(ctx) + + sts := &appsv1.StatefulSet{} + err := r.Get(ctx, req.NamespacedName, sts) + if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + log.Error(err, "unable to fetch StatefulSet", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + return reconcile.Result{}, err + } + + if !r.needPartitionUpdate(sts) { + return reconcile.Result{}, nil + } + + if r.isStatefulSetRolloutComplete(sts) { + return reconcile.Result{}, nil + } + + ready, err := r.isRolloutReady(ctx, sts) + if err != nil { + log.Error(err, "failed to check if rollout is ready", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + return reconcile.Result{}, err + } + if !ready { + log.Info("rollout is not ready", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + } + + if err := r.applyNewPartition(ctx, sts); err != nil { + log.Error(err, "failed to apply new partition", "name", req.NamespacedName.Name, "namespace", req.NamespacedName.Namespace) + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +func (r *StatefulSetPartitionReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&appsv1.StatefulSet{}). + Owns(&corev1.Pod{}). + Complete(r) +} + +// isRolloutReady returns true if the StatefulSet is ready for rolling update. +func (r *StatefulSetPartitionReconciler) isRolloutReady(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { + isPodsReady, err := r.areAllChildPodsReady(ctx, sts) + if err != nil { + return false, fmt.Errorf("failed to check if all child pods are ready: %w", err) + } + + if !isPodsReady { + return false, nil + } + + cluster, err := r.getMySQLCluster(ctx, sts) + if err != nil { + return false, fmt.Errorf("failed to get MySQLCluster: %w", err) + } + + if !r.isMySQLClusterHealthy(cluster) { + return false, nil + } + + return false, nil +} + +// areAllChildPodsReady checks if all child Pods of a given StatefulSet are in Ready state. +// It lists all Pods that match the StatefulSet's selector in the same namespace, +// and checks their Ready condition status. +// The function returns true if all Pods are Ready, and false otherwise. +func (r *StatefulSetPartitionReconciler) areAllChildPodsReady(ctx context.Context, sts *appsv1.StatefulSet) (bool, error) { + podList := &corev1.PodList{} + listOpts := []client.ListOption{ + client.InNamespace(sts.Namespace), + client.MatchingLabels(sts.Spec.Selector.MatchLabels), + } + + if err := r.List(ctx, podList, listOpts...); err != nil { + return false, err + } + + if sts.Status.Replicas != int32(len(podList.Items)) { + return false, fmt.Errorf("number of child pods %d is not equal to replicas %d", len(podList.Items), sts.Status.Replicas) + } + + for _, pod := range podList.Items { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue { + return false, nil + } + } + } + + return true, nil +} + +// isMySQLClusterHealthy checks the health status of a given MySQLCluster. +// It verifies that the 'Available' and 'Healthy' conditions in the cluster's status are 'True', +// and that the value of '.status.syncedReplicas' matches that of '.spec.replicas'. +// The function returns true if all conditions are met, and false otherwise. +func (r *StatefulSetPartitionReconciler) isMySQLClusterHealthy(cluster *mocov1beta2.MySQLCluster) bool { + for _, condition := range cluster.Status.Conditions { + if (condition.Type == mocov1beta2.ConditionAvailable || condition.Type == mocov1beta2.ConditionHealthy) && condition.Status != metav1.ConditionTrue { + return false + } + } + + return int32(cluster.Status.SyncedReplicas) == cluster.Spec.Replicas +} + +// getMySQLCluster retrieves the MySQLCluster release that owns a given StatefulSet. +func (r *StatefulSetPartitionReconciler) getMySQLCluster(ctx context.Context, sts *appsv1.StatefulSet) (*mocov1beta2.MySQLCluster, error) { + for _, ownerRef := range sts.GetOwnerReferences() { + if ownerRef.Kind != "MySQLCluster" { + continue + } + + cluster := &mocov1beta2.MySQLCluster{} + if err := r.Get(ctx, types.NamespacedName{Name: ownerRef.Name, Namespace: sts.Namespace}, cluster); err != nil { + return nil, err + } + + return cluster, nil + } + + return nil, fmt.Errorf("StatefulSet %s/%s has no owner reference to MySQLCluster", sts.Namespace, sts.Name) +} + +// isStatefulSetRolloutComplete returns true if the StatefulSet is update completed. +func (r *StatefulSetPartitionReconciler) isStatefulSetRolloutComplete(sts *appsv1.StatefulSet) bool { + return sts.Status.CurrentRevision == sts.Status.UpdateRevision +} + +// needPartitionUpdate returns true if the StatefulSet needs to update partition. +func (r *StatefulSetPartitionReconciler) needPartitionUpdate(sts *appsv1.StatefulSet) bool { + if sts.Annotations[constants.AnnDisablePartitioning] == "true" { + return false + } + if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil { + return false + } + + return *sts.Spec.UpdateStrategy.RollingUpdate.Partition > 0 +} + +// applyNewPartition applies a new partition to the StatefulSet, +// subtracting 1 from the current partition. +func (r *StatefulSetPartitionReconciler) applyNewPartition(ctx context.Context, sts *appsv1.StatefulSet) error { + newPartition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition - 1 + + key := client.ObjectKey{ + Namespace: sts.Namespace, + Name: sts.Name, + } + + stsApplyCfg := appsv1ac.StatefulSet(sts.Name, sts.Namespace). + WithSpec(appsv1ac.StatefulSetSpec(). + WithUpdateStrategy(appsv1ac.StatefulSetUpdateStrategy(). + WithType(appsv1.RollingUpdateStatefulSetStrategyType). + WithRollingUpdate(appsv1ac.RollingUpdateStatefulSetStrategy().WithPartition(newPartition)), + ), + ) + + if _, err := apply(ctx, r.Client, key, stsApplyCfg, appsv1ac.ExtractStatefulSet); err != nil { + if errors.Is(err, ErrApplyConfigurationNotChanged) { + return nil + } + return fmt.Errorf("failed to apply new partition to StatefulSet %s/%s: %w", sts.Namespace, sts.Name, err) + } + + return nil +} diff --git a/pkg/constants/meta.go b/pkg/constants/meta.go index 061284ee7..8cba526f8 100644 --- a/pkg/constants/meta.go +++ b/pkg/constants/meta.go @@ -21,6 +21,7 @@ const ( AnnSecretVersion = "moco.cybozu.com/secret-version" AnnClusteringStopped = "moco.cybozu.com/clustering-stopped" AnnReconciliationStopped = "moco.cybozu.com/reconciliation-stopped" + AnnDisablePartitioning = "moco.cybozu.com/disable-partitioning" ) // MySQLClusterFinalizer is the finalizer specifier for MySQLCluster.