Skip to content

Commit

Permalink
Merge pull request #751 from cybozu-go/issue-747
Browse files Browse the repository at this point in the history
pods are deleted without waiting for completion when switchover takes a long time
  • Loading branch information
shunki-fujita authored Nov 20, 2024
2 parents 37fb0c5 + f537739 commit 1471dee
Show file tree
Hide file tree
Showing 23 changed files with 595 additions and 12 deletions.
1 change: 1 addition & 0 deletions .github/workflows/helm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
version: v0.23.0
node_image: kindest/node:v1.31.0
kubectl_version: v1.31.0
config: e2e/kind-config.yaml

- name: Apply cert-manager
run: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ manifests: controller-gen kustomize yq ## Generate WebhookConfiguration, Cluster
mkdir -p charts/moco/templates/generated/crds/
$(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
$(KUSTOMIZE) build config/crd -o config/crd/tests # Outputs static CRDs for use with Envtest.
$(KUSTOMIZE) build config/kustomize-to-helm/overlays/templates | $(YQ) e "." - > charts/moco/templates/generated/generated.yaml
$(KUSTOMIZE) build config/kustomize-to-helm/overlays/templates | $(YQ) e ". | del(select(.kind==\"ValidatingAdmissionPolicy\" or .kind==\"ValidatingAdmissionPolicyBinding\").metadata.namespace)" - > charts/moco/templates/generated/generated.yaml # Manually remove namespaces because the API version supported by kustomize is out of date.
echo '{{- if .Values.crds.enabled }}' > charts/moco/templates/generated/crds/moco_crds.yaml
$(KUSTOMIZE) build config/kustomize-to-helm/overlays/crds | $(YQ) e "." - >> charts/moco/templates/generated/crds/moco_crds.yaml
echo '{{- end }}' >> charts/moco/templates/generated/crds/moco_crds.yaml
Expand Down
9 changes: 9 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ type MySQLClusterSpec struct {
// +optional
MaxDelaySeconds *int `json:"maxDelaySeconds,omitempty"`

// MaxDelaySecondsForPodDeletion configures the maximum allowed replication delay before a Pod deletion is blocked.
// If the replication delay exceeds this threshold, deletion of the primary pod will be prevented.
// The default is 0 seconds.
// Setting this field to 0 disables the delay check for pod deletion.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=0
// +optional
MaxDelaySecondsForPodDeletion int64 `json:"maxDelaySecondsForPodDeletion,omitempty"`

// StartupWaitSeconds is the maximum duration to wait for `mysqld` container to start working.
// The default is 3600 seconds.
// +kubebuilder:validation:Minimum=0
Expand Down
6 changes: 6 additions & 0 deletions charts/moco/templates/generated/crds/moco_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,12 @@ spec:
description: 'MaxDelaySeconds configures the readiness probe of '
minimum: 0
type: integer
maxDelaySecondsForPodDeletion:
default: 0
description: MaxDelaySecondsForPodDeletion configures the maxim
format: int64
minimum: 0
type: integer
mysqlConfigMapName:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
Expand Down
42 changes: 42 additions & 0 deletions charts/moco/templates/generated/generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,48 @@ spec:
app.kubernetes.io/component: moco-controller
app.kubernetes.io/name: '{{ include "moco.name" . }}'
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicy
metadata:
labels:
app.kubernetes.io/managed-by: '{{ .Release.Service }}'
app.kubernetes.io/name: '{{ include "moco.name" . }}'
app.kubernetes.io/version: '{{ .Chart.AppVersion }}'
helm.sh/chart: '{{ include "moco.chart" . }}'
name: moco-delete-validator
spec:
failurePolicy: Fail
matchConstraints:
resourceRules:
- apiGroups:
- ""
apiVersions:
- '*'
operations:
- DELETE
resources:
- pods
validations:
- expression: |
!has(oldObject.metadata.annotations) ||
!("moco.cybozu.com/prevent-delete" in oldObject.metadata.annotations) ||
!(oldObject.metadata.annotations["moco.cybozu.com/prevent-delete"] == "true")
messageExpression: oldObject.metadata.name + ' is protected from deletion'
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicyBinding
metadata:
labels:
app.kubernetes.io/managed-by: '{{ .Release.Service }}'
app.kubernetes.io/name: '{{ include "moco.name" . }}'
app.kubernetes.io/version: '{{ .Chart.AppVersion }}'
helm.sh/chart: '{{ include "moco.chart" . }}'
name: moco-delete-validator
spec:
policyName: moco-delete-validator
validationActions:
- Deny
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
Expand Down
68 changes: 68 additions & 0 deletions clustering/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,4 +1016,72 @@ var _ = Describe("manager", func() {
Expect(ms.backupWorkDirUsage).To(MetricsIs("==", 30))
Expect(ms.backupWarnings).To(MetricsIs("==", 2))
})

It("should detect replication delay and prevent deletion of primary", func() {
testSetupResources(ctx, 3, "")

cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil))
defer cm.StopAll()

cluster, err := testGetCluster(ctx)
Expect(err).NotTo(HaveOccurred())
cm.Update(client.ObjectKeyFromObject(cluster), "test")
defer func() {
cm.Stop(client.ObjectKeyFromObject(cluster))
time.Sleep(400 * time.Millisecond)
Eventually(func(g Gomega) {
ch := make(chan prometheus.Metric, 2)
metrics.ErrantReplicasVec.Collect(ch)
g.Expect(ch).NotTo(Receive())
}).Should(Succeed())
}()

// set MaxDelaySecondsForPodDeletion to 10sec
cluster.Spec.MaxDelaySecondsForPodDeletion = 10
err = k8sClient.Update(ctx, cluster)
Expect(err).NotTo(HaveOccurred())

// wait for cluster's condition changes
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())

condHealthy, err := testGetCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue))
}).Should(Succeed())

// set replication delay to 100sec
primary := cluster.Status.CurrentPrimaryIndex
for i := 0; i < 3; i++ {
if i == primary {
continue
}
of.setSecondsBehindSource(cluster.PodHostname(i), 100)
}

// wait for the pods' annotations are updated
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(primary)}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Annotations).To(HaveKeyWithValue(constants.AnnPreventDelete, "true"))
}).Should(Succeed())

// set replication delay to 0sec
for i := 0; i < 3; i++ {
if i == primary {
continue
}
of.setSecondsBehindSource(cluster.PodHostname(i), 0)
}

// wait for the pods' annotations are updated
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(primary)}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Annotations).NotTo(HaveKey(constants.AnnPreventDelete))
}).Should(Succeed())
})
})
12 changes: 12 additions & 0 deletions clustering/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clustering

import (
"context"
"database/sql"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -447,6 +448,12 @@ func (m *mockMySQL) setRetrievedGTIDSet(gtid string) {
m.status.ReplicaStatus.RetrievedGtidSet = gtid
}

func (m *mockMySQL) setSecondsBehindSource(seconds int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.status.ReplicaStatus.SecondsBehindSource = sql.NullInt64{Int64: seconds, Valid: true}
}

type mockOpFactory struct {
orphaned int64

Expand Down Expand Up @@ -548,3 +555,8 @@ func (f *mockOpFactory) getKillConnectionsCount(name string) int {
defer f.mu.Unlock()
return f.countKillConnections[name]
}

func (f *mockOpFactory) setSecondsBehindSource(name string, seconds int64) {
m := f.getInstance(name)
m.setSecondsBehindSource(seconds)
}
32 changes: 32 additions & 0 deletions clustering/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,38 @@ func (p *managerProcess) addRoleLabel(ctx context.Context, ss *StatusSet, noRole
return nil
}

func (p *managerProcess) removeAnnPreventDelete(ctx context.Context, ss *StatusSet) error {
log := logFromContext(ctx)
for _, pod := range ss.Pods {
if _, exists := pod.Annotations[constants.AnnPreventDelete]; exists {
newPod := pod.DeepCopy()
delete(newPod.Annotations, constants.AnnPreventDelete)
log.Info("replication delay resolved, allow pod deletion", "pod", pod.Name)
if err := p.client.Patch(ctx, newPod, client.MergeFrom(pod)); err != nil {
return fmt.Errorf("failed to remove moco.cybozu.com/prevent-delete annotation: %w", err)
}
}
}
return nil
}

func (p *managerProcess) addAnnPreventDelete(ctx context.Context, ss *StatusSet) error {
log := logFromContext(ctx)
ppod := ss.Pods[ss.Primary]
newPod := ppod.DeepCopy()
if newPod.Annotations == nil {
newPod.Annotations = make(map[string]string)
}
if _, exists := newPod.Annotations[constants.AnnPreventDelete]; !exists {
newPod.Annotations[constants.AnnPreventDelete] = "true"
log.Info("replication delay detected, prevent pod deletion", "pod", ppod.Name)
if err := p.client.Patch(ctx, newPod, client.MergeFrom(ppod)); err != nil {
return fmt.Errorf("failed to add moco.cybozu.com/prevent-delete annotation: %w", err)
}
}
return nil
}

func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, error) {
redo := false

Expand Down
14 changes: 13 additions & 1 deletion clustering/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) {
return false, fmt.Errorf("failed to update status fields in MySQLCluster: %w", err)
}

if ss.PreventPodDeletion {
err := p.addAnnPreventDelete(ctx, ss)
if err != nil {
return false, fmt.Errorf("failed to add annotation to prevent pod deletion: %w", err)
}
} else {
err := p.removeAnnPreventDelete(ctx, ss)
if err != nil {
return false, fmt.Errorf("failed to remove annotation to prevent pod deletion: %w", err)
}
}

logFromContext(ctx).Info("cluster state is " + ss.State.String())
switch ss.State {
case StateOffline:
Expand All @@ -206,7 +218,7 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) {
return false, nil

case StateHealthy, StateDegraded:
if ss.NeedSwitch {
if ss.NeedSwitch && !ss.PreventPodDeletion {
if err := p.switchover(ctx, ss); err != nil {
event.SwitchOverFailed.Emit(ss.Cluster, p.recorder, err)
return false, fmt.Errorf("failed to switchover: %w", err)
Expand Down
28 changes: 25 additions & 3 deletions clustering/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ type StatusSet struct {
Errants []int
Candidates []int

NeedSwitch bool
Candidate int
State ClusterState
NeedSwitch bool
PreventPodDeletion bool
Candidate int
State ClusterState
}

// Close closes `ss.DBOps`.
Expand Down Expand Up @@ -235,6 +236,27 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) {
ss.ExecutedGTID = pst.GlobalVariables.ExecutedGTID
}

// detect replication delay
if cluster.Spec.MaxDelaySecondsForPodDeletion > 0 {
preventPodDeletion := false
for i, ist := range ss.MySQLStatus {
if i == ss.Primary {
continue
}
if ist == nil {
continue
}
if ist.ReplicaStatus == nil {
continue
}
if ist.ReplicaStatus.SecondsBehindSource.Valid && ist.ReplicaStatus.SecondsBehindSource.Int64 > cluster.Spec.MaxDelaySecondsForPodDeletion {
preventPodDeletion = true
break
}
}
ss.PreventPodDeletion = preventPodDeletion
}

// detect errant replicas
if ss.ExecutedGTID != "" {
pst := ss.MySQLStatus[ss.Primary]
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/moco.cybozu.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ spec:
description: 'MaxDelaySeconds configures the readiness probe of '
minimum: 0
type: integer
maxDelaySecondsForPodDeletion:
default: 0
description: MaxDelaySecondsForPodDeletion configures the maxim
format: int64
minimum: 0
type: integer
mysqlConfigMapName:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ spec:
description: 'MaxDelaySeconds configures the readiness probe of '
minimum: 0
type: integer
maxDelaySecondsForPodDeletion:
default: 0
description: MaxDelaySecondsForPodDeletion configures the maxim
format: int64
minimum: 0
type: integer
mysqlConfigMapName:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
Expand Down
1 change: 1 addition & 0 deletions config/webhook/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
resources:
- manifests.yaml
- service.yaml
- validate_preventdelete.yaml

configurations:
- kustomizeconfig.yaml
27 changes: 27 additions & 0 deletions config/webhook/validate_preventdelete.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicy
metadata:
name: delete-validator
spec:
failurePolicy: Fail
matchConstraints:
resourceRules:
- apiGroups: [""]
apiVersions: ["*"]
operations: ["DELETE"]
resources: ["pods"]
validations:
- expression: |
!has(oldObject.metadata.annotations) ||
!("moco.cybozu.com/prevent-delete" in oldObject.metadata.annotations) ||
!(oldObject.metadata.annotations["moco.cybozu.com/prevent-delete"] == "true")
messageExpression: oldObject.metadata.name + ' is protected from deletion'
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicyBinding
metadata:
name: delete-validator
spec:
policyName: moco-delete-validator
validationActions:
- Deny
10 changes: 6 additions & 4 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,14 @@ func (r *MySQLClusterReconciler) reconcileV1MyCnf(ctx context.Context, req ctrl.
return cms.Items[i].CreationTimestamp.Time.After(cms.Items[j].CreationTimestamp.Time)
})

for i, old := range cms.Items {
if i < r.MySQLConfigMapHistoryLimit {
oldMyCnfCount := 0
for _, old := range cms.Items {
if !strings.HasPrefix(old.Name, prefix) || old.Name == cmName {
continue
}

if strings.HasPrefix(old.Name, prefix) && old.Name != cmName {
oldMyCnfCount++
log.Info("found my.cnf configmap", "configMapName", old.Name, "count", oldMyCnfCount, "created", old.CreationTimestamp.Time)
if oldMyCnfCount > r.MySQLConfigMapHistoryLimit-1 {
if err := r.Delete(ctx, &old); err != nil {
return nil, fmt.Errorf("failed to delete old my.cnf configmap %s/%s: %w", old.Namespace, old.Name, err)
}
Expand Down
Loading

0 comments on commit 1471dee

Please sign in to comment.