diff --git a/.github/workflows/helm.yaml b/.github/workflows/helm.yaml index 07b6b5f3..b094b804 100644 --- a/.github/workflows/helm.yaml +++ b/.github/workflows/helm.yaml @@ -40,6 +40,7 @@ jobs: version: v0.23.0 node_image: kindest/node:v1.31.0 kubectl_version: v1.31.0 + config: e2e/kind-config.yaml - name: Apply cert-manager run: | diff --git a/Makefile b/Makefile index 80175d76..751df0db 100644 --- a/Makefile +++ b/Makefile @@ -63,7 +63,7 @@ manifests: controller-gen kustomize yq ## Generate WebhookConfiguration, Cluster mkdir -p charts/moco/templates/generated/crds/ $(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases $(KUSTOMIZE) build config/crd -o config/crd/tests # Outputs static CRDs for use with Envtest. - $(KUSTOMIZE) build config/kustomize-to-helm/overlays/templates | $(YQ) e "." - > charts/moco/templates/generated/generated.yaml + $(KUSTOMIZE) build config/kustomize-to-helm/overlays/templates | $(YQ) e ". | del(select(.kind==\"ValidatingAdmissionPolicy\" or .kind==\"ValidatingAdmissionPolicyBinding\").metadata.namespace)" - > charts/moco/templates/generated/generated.yaml # Manually remove namespaces because the API version supported by kustomize is out of date. echo '{{- if .Values.crds.enabled }}' > charts/moco/templates/generated/crds/moco_crds.yaml $(KUSTOMIZE) build config/kustomize-to-helm/overlays/crds | $(YQ) e "." - >> charts/moco/templates/generated/crds/moco_crds.yaml echo '{{- end }}' >> charts/moco/templates/generated/crds/moco_crds.yaml diff --git a/api/v1beta2/mysqlcluster_types.go b/api/v1beta2/mysqlcluster_types.go index 3d3de8a8..a4092330 100644 --- a/api/v1beta2/mysqlcluster_types.go +++ b/api/v1beta2/mysqlcluster_types.go @@ -87,6 +87,15 @@ type MySQLClusterSpec struct { // +optional MaxDelaySeconds *int `json:"maxDelaySeconds,omitempty"` + // MaxDelaySecondsForPodDeletion configures the maximum allowed replication delay before a Pod deletion is blocked. + // If the replication delay exceeds this threshold, deletion of the primary pod will be prevented. + // The default is 0 seconds. + // Setting this field to 0 disables the delay check for pod deletion. + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=0 + // +optional + MaxDelaySecondsForPodDeletion int64 `json:"maxDelaySecondsForPodDeletion,omitempty"` + // StartupWaitSeconds is the maximum duration to wait for `mysqld` container to start working. // The default is 3600 seconds. // +kubebuilder:validation:Minimum=0 diff --git a/charts/moco/templates/generated/crds/moco_crds.yaml b/charts/moco/templates/generated/crds/moco_crds.yaml index 1d4f4b4f..ee735f16 100644 --- a/charts/moco/templates/generated/crds/moco_crds.yaml +++ b/charts/moco/templates/generated/crds/moco_crds.yaml @@ -2261,6 +2261,12 @@ spec: description: 'MaxDelaySeconds configures the readiness probe of ' minimum: 0 type: integer + maxDelaySecondsForPodDeletion: + default: 0 + description: MaxDelaySecondsForPodDeletion configures the maxim + format: int64 + minimum: 0 + type: integer mysqlConfigMapName: description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL ' nullable: true diff --git a/charts/moco/templates/generated/generated.yaml b/charts/moco/templates/generated/generated.yaml index 8bd11a5f..ed07e253 100644 --- a/charts/moco/templates/generated/generated.yaml +++ b/charts/moco/templates/generated/generated.yaml @@ -372,6 +372,48 @@ spec: app.kubernetes.io/component: moco-controller app.kubernetes.io/name: '{{ include "moco.name" . }}' --- +apiVersion: admissionregistration.k8s.io/v1beta1 +kind: ValidatingAdmissionPolicy +metadata: + labels: + app.kubernetes.io/managed-by: '{{ .Release.Service }}' + app.kubernetes.io/name: '{{ include "moco.name" . }}' + app.kubernetes.io/version: '{{ .Chart.AppVersion }}' + helm.sh/chart: '{{ include "moco.chart" . }}' + name: moco-delete-validator +spec: + failurePolicy: Fail + matchConstraints: + resourceRules: + - apiGroups: + - "" + apiVersions: + - '*' + operations: + - DELETE + resources: + - pods + validations: + - expression: | + !has(oldObject.metadata.annotations) || + !("moco.cybozu.com/prevent-delete" in oldObject.metadata.annotations) || + !(oldObject.metadata.annotations["moco.cybozu.com/prevent-delete"] == "true") + messageExpression: oldObject.metadata.name + ' is protected from deletion' +--- +apiVersion: admissionregistration.k8s.io/v1beta1 +kind: ValidatingAdmissionPolicyBinding +metadata: + labels: + app.kubernetes.io/managed-by: '{{ .Release.Service }}' + app.kubernetes.io/name: '{{ include "moco.name" . }}' + app.kubernetes.io/version: '{{ .Chart.AppVersion }}' + helm.sh/chart: '{{ include "moco.chart" . }}' + name: moco-delete-validator +spec: + policyName: moco-delete-validator + validationActions: + - Deny +--- apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata: diff --git a/clustering/manager_test.go b/clustering/manager_test.go index 2e363a10..d619c934 100644 --- a/clustering/manager_test.go +++ b/clustering/manager_test.go @@ -1016,4 +1016,72 @@ var _ = Describe("manager", func() { Expect(ms.backupWorkDirUsage).To(MetricsIs("==", 30)) Expect(ms.backupWarnings).To(MetricsIs("==", 2)) }) + + It("should detect replication delay and prevent deletion of primary", func() { + testSetupResources(ctx, 3, "") + + cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil)) + defer cm.StopAll() + + cluster, err := testGetCluster(ctx) + Expect(err).NotTo(HaveOccurred()) + cm.Update(client.ObjectKeyFromObject(cluster), "test") + defer func() { + cm.Stop(client.ObjectKeyFromObject(cluster)) + time.Sleep(400 * time.Millisecond) + Eventually(func(g Gomega) { + ch := make(chan prometheus.Metric, 2) + metrics.ErrantReplicasVec.Collect(ch) + g.Expect(ch).NotTo(Receive()) + }).Should(Succeed()) + }() + + // set MaxDelaySecondsForPodDeletion to 10sec + cluster.Spec.MaxDelaySecondsForPodDeletion = 10 + err = k8sClient.Update(ctx, cluster) + Expect(err).NotTo(HaveOccurred()) + + // wait for cluster's condition changes + Eventually(func(g Gomega) { + cluster, err = testGetCluster(ctx) + g.Expect(err).NotTo(HaveOccurred()) + + condHealthy, err := testGetCondition(cluster, mocov1beta2.ConditionHealthy) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue)) + }).Should(Succeed()) + + // set replication delay to 100sec + primary := cluster.Status.CurrentPrimaryIndex + for i := 0; i < 3; i++ { + if i == primary { + continue + } + of.setSecondsBehindSource(cluster.PodHostname(i), 100) + } + + // wait for the pods' annotations are updated + Eventually(func(g Gomega) { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(primary)}, pod) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pod.Annotations).To(HaveKeyWithValue(constants.AnnPreventDelete, "true")) + }).Should(Succeed()) + + // set replication delay to 0sec + for i := 0; i < 3; i++ { + if i == primary { + continue + } + of.setSecondsBehindSource(cluster.PodHostname(i), 0) + } + + // wait for the pods' annotations are updated + Eventually(func(g Gomega) { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(primary)}, pod) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(pod.Annotations).NotTo(HaveKey(constants.AnnPreventDelete)) + }).Should(Succeed()) + }) }) diff --git a/clustering/mock_test.go b/clustering/mock_test.go index 604443d4..a8efa085 100644 --- a/clustering/mock_test.go +++ b/clustering/mock_test.go @@ -2,6 +2,7 @@ package clustering import ( "context" + "database/sql" "errors" "fmt" "sort" @@ -447,6 +448,12 @@ func (m *mockMySQL) setRetrievedGTIDSet(gtid string) { m.status.ReplicaStatus.RetrievedGtidSet = gtid } +func (m *mockMySQL) setSecondsBehindSource(seconds int64) { + m.mu.Lock() + defer m.mu.Unlock() + m.status.ReplicaStatus.SecondsBehindSource = sql.NullInt64{Int64: seconds, Valid: true} +} + type mockOpFactory struct { orphaned int64 @@ -548,3 +555,8 @@ func (f *mockOpFactory) getKillConnectionsCount(name string) int { defer f.mu.Unlock() return f.countKillConnections[name] } + +func (f *mockOpFactory) setSecondsBehindSource(name string, seconds int64) { + m := f.getInstance(name) + m.setSecondsBehindSource(seconds) +} diff --git a/clustering/operations.go b/clustering/operations.go index 6a4887ec..f8edb83f 100644 --- a/clustering/operations.go +++ b/clustering/operations.go @@ -315,6 +315,38 @@ func (p *managerProcess) addRoleLabel(ctx context.Context, ss *StatusSet, noRole return nil } +func (p *managerProcess) removeAnnPreventDelete(ctx context.Context, ss *StatusSet) error { + log := logFromContext(ctx) + for _, pod := range ss.Pods { + if _, exists := pod.Annotations[constants.AnnPreventDelete]; exists { + newPod := pod.DeepCopy() + delete(newPod.Annotations, constants.AnnPreventDelete) + log.Info("replication delay resolved, allow pod deletion", "pod", pod.Name) + if err := p.client.Patch(ctx, newPod, client.MergeFrom(pod)); err != nil { + return fmt.Errorf("failed to remove moco.cybozu.com/prevent-delete annotation: %w", err) + } + } + } + return nil +} + +func (p *managerProcess) addAnnPreventDelete(ctx context.Context, ss *StatusSet) error { + log := logFromContext(ctx) + ppod := ss.Pods[ss.Primary] + newPod := ppod.DeepCopy() + if newPod.Annotations == nil { + newPod.Annotations = make(map[string]string) + } + if _, exists := newPod.Annotations[constants.AnnPreventDelete]; !exists { + newPod.Annotations[constants.AnnPreventDelete] = "true" + log.Info("replication delay detected, prevent pod deletion", "pod", ppod.Name) + if err := p.client.Patch(ctx, newPod, client.MergeFrom(ppod)); err != nil { + return fmt.Errorf("failed to add moco.cybozu.com/prevent-delete annotation: %w", err) + } + } + return nil +} + func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, error) { redo := false diff --git a/clustering/process.go b/clustering/process.go index 054713f8..7d9be5ee 100644 --- a/clustering/process.go +++ b/clustering/process.go @@ -185,6 +185,18 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) { return false, fmt.Errorf("failed to update status fields in MySQLCluster: %w", err) } + if ss.PreventPodDeletion { + err := p.addAnnPreventDelete(ctx, ss) + if err != nil { + return false, fmt.Errorf("failed to add annotation to prevent pod deletion: %w", err) + } + } else { + err := p.removeAnnPreventDelete(ctx, ss) + if err != nil { + return false, fmt.Errorf("failed to remove annotation to prevent pod deletion: %w", err) + } + } + logFromContext(ctx).Info("cluster state is " + ss.State.String()) switch ss.State { case StateOffline: @@ -206,7 +218,7 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) { return false, nil case StateHealthy, StateDegraded: - if ss.NeedSwitch { + if ss.NeedSwitch && !ss.PreventPodDeletion { if err := p.switchover(ctx, ss); err != nil { event.SwitchOverFailed.Emit(ss.Cluster, p.recorder, err) return false, fmt.Errorf("failed to switchover: %w", err) diff --git a/clustering/status.go b/clustering/status.go index 03760624..b55b10d4 100644 --- a/clustering/status.go +++ b/clustering/status.go @@ -92,9 +92,10 @@ type StatusSet struct { Errants []int Candidates []int - NeedSwitch bool - Candidate int - State ClusterState + NeedSwitch bool + PreventPodDeletion bool + Candidate int + State ClusterState } // Close closes `ss.DBOps`. @@ -235,6 +236,27 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) { ss.ExecutedGTID = pst.GlobalVariables.ExecutedGTID } + // detect replication delay + if cluster.Spec.MaxDelaySecondsForPodDeletion > 0 { + preventPodDeletion := false + for i, ist := range ss.MySQLStatus { + if i == ss.Primary { + continue + } + if ist == nil { + continue + } + if ist.ReplicaStatus == nil { + continue + } + if ist.ReplicaStatus.SecondsBehindSource.Valid && ist.ReplicaStatus.SecondsBehindSource.Int64 > cluster.Spec.MaxDelaySecondsForPodDeletion { + preventPodDeletion = true + break + } + } + ss.PreventPodDeletion = preventPodDeletion + } + // detect errant replicas if ss.ExecutedGTID != "" { pst := ss.MySQLStatus[ss.Primary] diff --git a/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml b/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml index f40ee3a2..b7c40754 100644 --- a/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml +++ b/config/crd/bases/moco.cybozu.com_mysqlclusters.yaml @@ -78,6 +78,12 @@ spec: description: 'MaxDelaySeconds configures the readiness probe of ' minimum: 0 type: integer + maxDelaySecondsForPodDeletion: + default: 0 + description: MaxDelaySecondsForPodDeletion configures the maxim + format: int64 + minimum: 0 + type: integer mysqlConfigMapName: description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL ' nullable: true diff --git a/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml b/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml index d99dfc77..b9be3f33 100644 --- a/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml +++ b/config/crd/tests/apiextensions.k8s.io_v1_customresourcedefinition_mysqlclusters.moco.cybozu.com.yaml @@ -78,6 +78,12 @@ spec: description: 'MaxDelaySeconds configures the readiness probe of ' minimum: 0 type: integer + maxDelaySecondsForPodDeletion: + default: 0 + description: MaxDelaySecondsForPodDeletion configures the maxim + format: int64 + minimum: 0 + type: integer mysqlConfigMapName: description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL ' nullable: true diff --git a/config/webhook/kustomization.yaml b/config/webhook/kustomization.yaml index 9cf26134..81caa76c 100644 --- a/config/webhook/kustomization.yaml +++ b/config/webhook/kustomization.yaml @@ -1,6 +1,7 @@ resources: - manifests.yaml - service.yaml +- validate_preventdelete.yaml configurations: - kustomizeconfig.yaml diff --git a/config/webhook/validate_preventdelete.yaml b/config/webhook/validate_preventdelete.yaml new file mode 100644 index 00000000..a5ab99aa --- /dev/null +++ b/config/webhook/validate_preventdelete.yaml @@ -0,0 +1,27 @@ +apiVersion: admissionregistration.k8s.io/v1beta1 +kind: ValidatingAdmissionPolicy +metadata: + name: delete-validator +spec: + failurePolicy: Fail + matchConstraints: + resourceRules: + - apiGroups: [""] + apiVersions: ["*"] + operations: ["DELETE"] + resources: ["pods"] + validations: + - expression: | + !has(oldObject.metadata.annotations) || + !("moco.cybozu.com/prevent-delete" in oldObject.metadata.annotations) || + !(oldObject.metadata.annotations["moco.cybozu.com/prevent-delete"] == "true") + messageExpression: oldObject.metadata.name + ' is protected from deletion' +--- +apiVersion: admissionregistration.k8s.io/v1beta1 +kind: ValidatingAdmissionPolicyBinding +metadata: + name: delete-validator +spec: + policyName: moco-delete-validator + validationActions: + - Deny diff --git a/controllers/mysqlcluster_controller.go b/controllers/mysqlcluster_controller.go index 526b5ecd..f68e0afd 100644 --- a/controllers/mysqlcluster_controller.go +++ b/controllers/mysqlcluster_controller.go @@ -514,12 +514,14 @@ func (r *MySQLClusterReconciler) reconcileV1MyCnf(ctx context.Context, req ctrl. return cms.Items[i].CreationTimestamp.Time.After(cms.Items[j].CreationTimestamp.Time) }) - for i, old := range cms.Items { - if i < r.MySQLConfigMapHistoryLimit { + oldMyCnfCount := 0 + for _, old := range cms.Items { + if !strings.HasPrefix(old.Name, prefix) || old.Name == cmName { continue } - - if strings.HasPrefix(old.Name, prefix) && old.Name != cmName { + oldMyCnfCount++ + log.Info("found my.cnf configmap", "configMapName", old.Name, "count", oldMyCnfCount, "created", old.CreationTimestamp.Time) + if oldMyCnfCount > r.MySQLConfigMapHistoryLimit-1 { if err := r.Delete(ctx, &old); err != nil { return nil, fmt.Errorf("failed to delete old my.cnf configmap %s/%s: %w", old.Namespace, old.Name, err) } diff --git a/controllers/mysqlcluster_controller_test.go b/controllers/mysqlcluster_controller_test.go index 70218d6b..ab2eb087 100644 --- a/controllers/mysqlcluster_controller_test.go +++ b/controllers/mysqlcluster_controller_test.go @@ -493,7 +493,7 @@ var _ = Describe("MySQLCluster reconciler", func() { } if mycnfCount != 2 { - return fmt.Errorf("the number of config maps is not history limits: %d", len(cms.Items)) + return fmt.Errorf("the number of config maps is not history limits: %d", mycnfCount) } var mycnfCMs []*corev1.ConfigMap @@ -538,7 +538,7 @@ var _ = Describe("MySQLCluster reconciler", func() { } if mycnfCount != 2 { - return fmt.Errorf("the number of config maps is not history limits: %d", len(cms.Items)) + return fmt.Errorf("the number of config maps is not history limits: %d", mycnfCount) } var mycnfCMs []*corev1.ConfigMap diff --git a/docs/crd_mysqlcluster_v1beta2.md b/docs/crd_mysqlcluster_v1beta2.md index a42743e3..e89a8ebe 100644 --- a/docs/crd_mysqlcluster_v1beta2.md +++ b/docs/crd_mysqlcluster_v1beta2.md @@ -78,6 +78,7 @@ MySQLClusterSpec defines the desired state of MySQLCluster | collectors | Collectors is the list of collector flag names of mysqld_exporter. If this field is not empty, MOCO adds mysqld_exporter as a sidecar to collect and export mysqld metrics in Prometheus format.\n\nSee https://github.com/prometheus/mysqld_exporter/blob/master/README.md#collector-flags for flag names.\n\nExample: [\"engine_innodb_status\", \"info_schema.innodb_metrics\"] | []string | false | | serverIDBase | ServerIDBase, if set, will become the base number of server-id of each MySQL instance of this cluster. For example, if this is 100, the server-ids will be 100, 101, 102, and so on. If the field is not given or zero, MOCO automatically sets a random positive integer. | int32 | false | | maxDelaySeconds | MaxDelaySeconds configures the readiness probe of mysqld container. For a replica mysqld instance, if it is delayed to apply transactions over this threshold, the mysqld instance will be marked as non-ready. The default is 60 seconds. Setting this field to 0 disables the delay check in the probe. | *int | false | +| maxDelaySecondsForPodDeletion | MaxDelaySecondsForPodDeletion configures the maximum allowed replication delay before a Pod deletion is blocked. If the replication delay exceeds this threshold, deletion of the primary pod will be prevented. The default is 0 seconds. Setting this field to 0 disables the delay check for pod deletion. | int64 | false | | startupWaitSeconds | StartupWaitSeconds is the maximum duration to wait for `mysqld` container to start working. The default is 3600 seconds. | int32 | false | | logRotationSchedule | LogRotationSchedule specifies the schedule to rotate MySQL logs. If not set, the default is to rotate logs every 5 minutes. See https://pkg.go.dev/github.com/robfig/cron/v3#hdr-CRON_Expression_Format for the field format. | string | false | | backupPolicyName | The name of BackupPolicy custom resource in the same namespace. If this is set, MOCO creates a CronJob to take backup of this MySQL cluster periodically. | *string | false | diff --git a/e2e/failover_test.go b/e2e/failover_test.go index cadd97f3..f3caee39 100644 --- a/e2e/failover_test.go +++ b/e2e/failover_test.go @@ -14,7 +14,7 @@ import ( //go:embed testdata/failover.yaml var failoverYAML string -var _ = Context("failure", Ordered, func() { +var _ = Context("failover", Ordered, func() { if doUpgrade { return } diff --git a/e2e/kind-config.yaml b/e2e/kind-config.yaml index c61aead9..b6dc4076 100644 --- a/e2e/kind-config.yaml +++ b/e2e/kind-config.yaml @@ -1,5 +1,9 @@ apiVersion: kind.x-k8s.io/v1alpha4 kind: Cluster +featureGates: + ValidatingAdmissionPolicy: true +runtimeConfig: + admissionregistration.k8s.io/v1beta1: true nodes: - role: control-plane - role: worker diff --git a/e2e/kind-config_actions.yaml b/e2e/kind-config_actions.yaml index 1c7ee2e2..7cde60a8 100644 --- a/e2e/kind-config_actions.yaml +++ b/e2e/kind-config_actions.yaml @@ -1,5 +1,9 @@ apiVersion: kind.x-k8s.io/v1alpha4 kind: Cluster +featureGates: + ValidatingAdmissionPolicy: true +runtimeConfig: + admissionregistration.k8s.io/v1beta1: true nodes: - role: control-plane - role: worker diff --git a/e2e/prevent_delete_test.go b/e2e/prevent_delete_test.go new file mode 100644 index 00000000..5f126336 --- /dev/null +++ b/e2e/prevent_delete_test.go @@ -0,0 +1,291 @@ +package e2e + +import ( + _ "embed" + "encoding/json" + "errors" + "fmt" + "strconv" + "time" + + mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2" + "github.com/cybozu-go/moco/pkg/constants" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +//go:embed testdata/prevent_delete.yaml +var preventDeleteYAML string + +func initializeMySQL() { + kubectlSafe(fillTemplate(preventDeleteYAML), "apply", "-f", "-") + Eventually(func() error { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + time.Sleep(30 * time.Second) + + // wait for primary to be 1 + Eventually(func() int { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + if cluster.Status.CurrentPrimaryIndex != 1 { + kubectlSafe(nil, "moco", "-n", "prevent-delete", "switchover", "test") + time.Sleep(10 * time.Second) + } + return cluster.Status.CurrentPrimaryIndex + }).Should(Equal(1)) + + kubectlSafe(nil, "moco", "mysql", "-n", "prevent-delete", "-u", "moco-writable", "test", "--", + "-e", "CREATE DATABASE test") + kubectlSafe(nil, "moco", "mysql", "-n", "prevent-delete", "-u", "moco-writable", "test", "--", + "-e", "CREATE TABLE test.t (i INT)") +} + +func cleanupMySQL() { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + primary := cluster.Status.CurrentPrimaryIndex + for i := 0; i < 3; i++ { + if i == primary { + continue + } + setSourceDelay(i, 0) + } + time.Sleep(10 * time.Second) + + kubectlSafe(nil, "delete", "mysqlclusters", "-n", "prevent-delete", "--all") + verifyAllPodsDeleted("prevent-delete") +} + +func setSourceDelay(index, delay int) { + kubectlSafe(nil, "moco", "mysql", "-n", "prevent-delete", "-u", "moco-admin", "--index", strconv.Itoa(index), "test", "--", "-e", "STOP REPLICA SQL_THREAD") + kubectlSafe(nil, "moco", "mysql", "-n", "prevent-delete", "-u", "moco-admin", "--index", strconv.Itoa(index), "test", "--", "-e", fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_DELAY=%d", delay)) + kubectlSafe(nil, "moco", "mysql", "-n", "prevent-delete", "-u", "moco-admin", "--index", strconv.Itoa(index), "test", "--", "-e", "START REPLICA") + if delay != 0 { + kubectlSafe(nil, "moco", "mysql", "-n", "prevent-delete", "-u", "moco-writable", "test", "--", "-e", "INSERT INTO test.t VALUES (1); COMMIT;") + } +} + +var _ = Context("PreventDelete", func() { + if doUpgrade { + return + } + + BeforeEach(func() { + initializeMySQL() + }) + + AfterEach(func() { + cleanupMySQL() + }) + + It("should add or remove prevent-delete annotation by replication delay", func() { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + primary := cluster.Status.CurrentPrimaryIndex + + // add prevent-delete annotation and wait for it to be removed + for i := 0; i < 3; i++ { + kubectlSafe(nil, "annotate", "pod", "-n", "prevent-delete", cluster.PodName(i), "moco.cybozu.com/prevent-delete=true") + Eventually(func() error { + out, err := kubectl(nil, "get", "pod", "-n", "prevent-delete", cluster.PodName(i), "-o", "json") + Expect(err).NotTo(HaveOccurred()) + pod := &corev1.Pod{} + err = json.Unmarshal(out, pod) + Expect(err).NotTo(HaveOccurred()) + if _, exists := pod.Annotations[constants.AnnPreventDelete]; exists { + return errors.New("annotation is not removed") + } + return nil + }).Should(Succeed()) + } + + // set huge replication delay + setSourceDelay(0, 10000) + + // wait for prevent-delete annotation to be added + Eventually(func() error { + out, err := kubectl(nil, "get", "pod", "-n", "prevent-delete", cluster.PodName(primary), "-o", "json") + Expect(err).NotTo(HaveOccurred()) + pod := &corev1.Pod{} + err = json.Unmarshal(out, pod) + Expect(err).NotTo(HaveOccurred()) + if val, exists := pod.Annotations[constants.AnnPreventDelete]; !exists { + return errors.New("annotation is not added") + } else if val != "true" { + return fmt.Errorf("annotation value is not true: %s", val) + } + return nil + }).Should(Succeed()) + + // fail to delete pod with prevent-delete annotation + _, err = kubectl(nil, "delete", "pod", "-n", "prevent-delete", cluster.PodName(primary)) + Expect(err.Error()).To(ContainSubstring("%s is protected from deletion", cluster.PodName(primary))) + + // resolve replication delay + setSourceDelay(0, 0) + + // wait for prevent-delete annotation to be removed + Eventually(func() error { + out, err := kubectl(nil, "get", "pod", "-n", "prevent-delete", cluster.PodName(primary), "-o", "json") + Expect(err).NotTo(HaveOccurred()) + pod := &corev1.Pod{} + err = json.Unmarshal(out, pod) + Expect(err).NotTo(HaveOccurred()) + if _, exists := pod.Annotations[constants.AnnPreventDelete]; exists { + return errors.New("annotation is not removed") + } + return nil + }).Should(Succeed()) + }) + + It("should not finish rollout restart if replication delay occurs", func() { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + primary := cluster.Status.CurrentPrimaryIndex + + // set huge replication delay + setSourceDelay(0, 10000) + + // wait for prevent-delete annotation to be added + Eventually(func() error { + out, err := kubectl(nil, "get", "pod", "-n", "prevent-delete", cluster.PodName(primary), "-o", "json") + Expect(err).NotTo(HaveOccurred()) + pod := &corev1.Pod{} + err = json.Unmarshal(out, pod) + Expect(err).NotTo(HaveOccurred()) + if val, exists := pod.Annotations[constants.AnnPreventDelete]; !exists { + return errors.New("annotation is not added") + } else if val != "true" { + return fmt.Errorf("annotation value is not true: %s", val) + } + return nil + }).Should(Succeed()) + + // never finish rollout restart + kubectlSafe(nil, "rollout", "restart", "sts", "-n", "prevent-delete", "moco-test") + Consistently(func() error { + out, err := kubectl(nil, "get", "sts", "-n", "prevent-delete", "moco-test", "-o", "json") + Expect(err).NotTo(HaveOccurred()) + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(out, sts) + Expect(err).NotTo(HaveOccurred()) + if sts.Status.UpdatedReplicas != sts.Status.Replicas { + return errors.New("rollout restart is not finished") + } + return nil + }, 3*time.Minute).ShouldNot(Succeed()) + + // resolve replication delay + setSourceDelay(0, 0) + + // wait for rollout restart to be finished + Eventually(func() error { + out, err := kubectl(nil, "get", "sts", "-n", "prevent-delete", "moco-test", "-o", "json") + Expect(err).NotTo(HaveOccurred()) + sts := &appsv1.StatefulSet{} + err = json.Unmarshal(out, sts) + Expect(err).NotTo(HaveOccurred()) + if sts.Status.UpdatedReplicas != sts.Status.Replicas { + return errors.New("rollout restart is not finished") + } + return nil + }).Should(Succeed()) + + // wait for cluster to be healthy + Eventually(func() error { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + time.Sleep(30 * time.Second) + }) + + It("should not finish switchover if replication delay occurs", func() { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + primary := cluster.Status.CurrentPrimaryIndex + + // set huge replication delay + setSourceDelay(0, 10000) + + // wait for prevent-delete annotation to be added + Eventually(func() error { + out, err := kubectl(nil, "get", "pod", "-n", "prevent-delete", cluster.PodName(primary), "-o", "json") + Expect(err).NotTo(HaveOccurred()) + pod := &corev1.Pod{} + err = json.Unmarshal(out, pod) + Expect(err).NotTo(HaveOccurred()) + if val, exists := pod.Annotations[constants.AnnPreventDelete]; !exists { + return errors.New("annotation is not added") + } else if val != "true" { + return fmt.Errorf("annotation value is not true: %s", val) + } + return nil + }).Should(Succeed()) + + // never finish switchover + kubectlSafe(nil, "moco", "switchover", "-n", "prevent-delete", "test") + Consistently(func() error { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + if cluster.Status.CurrentPrimaryIndex == primary { + return errors.New("switchover is not finished") + } + return nil + }, 1*time.Minute).ShouldNot(Succeed()) + + // resolve replication delay + setSourceDelay(0, 0) + + // wait for switchover to be finished + Eventually(func() error { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + if cluster.Status.CurrentPrimaryIndex == primary { + return errors.New("switchover is not finished") + } + return nil + }).Should(Succeed()) + + // wait for cluster to be healthy + Eventually(func() error { + cluster, err := getCluster("prevent-delete", "test") + Expect(err).NotTo(HaveOccurred()) + for _, cond := range cluster.Status.Conditions { + if cond.Type != mocov1beta2.ConditionHealthy { + continue + } + if cond.Status == metav1.ConditionTrue { + return nil + } + return fmt.Errorf("cluster is not healthy: %s", cond.Status) + } + return errors.New("no health condition") + }).Should(Succeed()) + time.Sleep(30 * time.Second) + }) +}) diff --git a/e2e/testdata/prevent_delete.yaml b/e2e/testdata/prevent_delete.yaml new file mode 100644 index 00000000..cc304803 --- /dev/null +++ b/e2e/testdata/prevent_delete.yaml @@ -0,0 +1,36 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: prevent-delete +--- +apiVersion: v1 +kind: ConfigMap +metadata: + namespace: prevent-delete + name: mycnf +data: + innodb_log_file_size: "10M" +--- +apiVersion: moco.cybozu.com/v1beta2 +kind: MySQLCluster +metadata: + namespace: prevent-delete + name: test +spec: + mysqlConfigMapName: mycnf + replicas: 3 + maxDelaySeconds: 0 + maxDelaySecondsForPodDeletion: 10 + podTemplate: + spec: + containers: + - name: mysqld + image: ghcr.io/cybozu-go/moco/mysql:{{ . }} + volumeClaimTemplates: + - metadata: + name: mysql-data + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi diff --git a/pkg/constants/meta.go b/pkg/constants/meta.go index f7cabf8e..970f28b2 100644 --- a/pkg/constants/meta.go +++ b/pkg/constants/meta.go @@ -22,6 +22,7 @@ const ( AnnClusteringStopped = "moco.cybozu.com/clustering-stopped" AnnReconciliationStopped = "moco.cybozu.com/reconciliation-stopped" AnnForceRollingUpdate = "moco.cybozu.com/force-rolling-update" + AnnPreventDelete = "moco.cybozu.com/prevent-delete" ) // MySQLClusterFinalizer is the finalizer specifier for MySQLCluster.