Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pods are deleted without waiting for completion when switchover takes a long time #751

Merged
merged 18 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/helm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
version: v0.23.0
node_image: kindest/node:v1.31.0
kubectl_version: v1.31.0
config: e2e/kind-config.yaml

- name: Apply cert-manager
run: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ manifests: controller-gen kustomize yq ## Generate WebhookConfiguration, Cluster
mkdir -p charts/moco/templates/generated/crds/
$(CONTROLLER_GEN) $(CRD_OPTIONS) rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
$(KUSTOMIZE) build config/crd -o config/crd/tests # Outputs static CRDs for use with Envtest.
$(KUSTOMIZE) build config/kustomize-to-helm/overlays/templates | $(YQ) e "." - > charts/moco/templates/generated/generated.yaml
$(KUSTOMIZE) build config/kustomize-to-helm/overlays/templates | $(YQ) e ". | del(select(.kind==\"ValidatingAdmissionPolicy\" or .kind==\"ValidatingAdmissionPolicyBinding\").metadata.namespace)" - > charts/moco/templates/generated/generated.yaml # Manually remove namespaces because the API version supported by kustomize is out of date.
echo '{{- if .Values.crds.enabled }}' > charts/moco/templates/generated/crds/moco_crds.yaml
$(KUSTOMIZE) build config/kustomize-to-helm/overlays/crds | $(YQ) e "." - >> charts/moco/templates/generated/crds/moco_crds.yaml
echo '{{- end }}' >> charts/moco/templates/generated/crds/moco_crds.yaml
Expand Down
9 changes: 9 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ type MySQLClusterSpec struct {
// +optional
MaxDelaySeconds *int `json:"maxDelaySeconds,omitempty"`

// MaxDelaySecondsForPodDeletion configures the maximum allowed replication delay before a Pod deletion is blocked.
// If the replication delay exceeds this threshold, deletion of the primary pod will be prevented.
// The default is 0 seconds.
// Setting this field to 0 disables the delay check for pod deletion.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=0
// +optional
MaxDelaySecondsForPodDeletion int64 `json:"maxDelaySecondsForPodDeletion,omitempty"`

// StartupWaitSeconds is the maximum duration to wait for `mysqld` container to start working.
// The default is 3600 seconds.
// +kubebuilder:validation:Minimum=0
Expand Down
6 changes: 6 additions & 0 deletions charts/moco/templates/generated/crds/moco_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,12 @@ spec:
description: 'MaxDelaySeconds configures the readiness probe of '
minimum: 0
type: integer
maxDelaySecondsForPodDeletion:
default: 0
description: MaxDelaySecondsForPodDeletion configures the maxim
format: int64
minimum: 0
type: integer
mysqlConfigMapName:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
Expand Down
42 changes: 42 additions & 0 deletions charts/moco/templates/generated/generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,48 @@ spec:
app.kubernetes.io/component: moco-controller
app.kubernetes.io/name: '{{ include "moco.name" . }}'
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicy
metadata:
labels:
app.kubernetes.io/managed-by: '{{ .Release.Service }}'
app.kubernetes.io/name: '{{ include "moco.name" . }}'
app.kubernetes.io/version: '{{ .Chart.AppVersion }}'
helm.sh/chart: '{{ include "moco.chart" . }}'
name: moco-delete-validator
spec:
failurePolicy: Fail
matchConstraints:
resourceRules:
- apiGroups:
- ""
apiVersions:
- '*'
operations:
- DELETE
resources:
- pods
validations:
- expression: |
!has(oldObject.metadata.annotations) ||
!("moco.cybozu.com/prevent-delete" in oldObject.metadata.annotations) ||
!(oldObject.metadata.annotations["moco.cybozu.com/prevent-delete"] == "true")
messageExpression: oldObject.metadata.name + ' is protected from deletion'
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicyBinding
metadata:
labels:
app.kubernetes.io/managed-by: '{{ .Release.Service }}'
app.kubernetes.io/name: '{{ include "moco.name" . }}'
app.kubernetes.io/version: '{{ .Chart.AppVersion }}'
helm.sh/chart: '{{ include "moco.chart" . }}'
name: moco-delete-validator
spec:
policyName: moco-delete-validator
validationActions:
- Deny
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
Expand Down
68 changes: 68 additions & 0 deletions clustering/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,4 +1016,72 @@ var _ = Describe("manager", func() {
Expect(ms.backupWorkDirUsage).To(MetricsIs("==", 30))
Expect(ms.backupWarnings).To(MetricsIs("==", 2))
})

It("should detect replication delay and prevent deletion of primary", func() {
testSetupResources(ctx, 3, "")

cm := NewClusterManager(1*time.Second, mgr, of, af, stdr.New(nil))
defer cm.StopAll()

cluster, err := testGetCluster(ctx)
Expect(err).NotTo(HaveOccurred())
cm.Update(client.ObjectKeyFromObject(cluster), "test")
defer func() {
cm.Stop(client.ObjectKeyFromObject(cluster))
time.Sleep(400 * time.Millisecond)
Eventually(func(g Gomega) {
ch := make(chan prometheus.Metric, 2)
metrics.ErrantReplicasVec.Collect(ch)
g.Expect(ch).NotTo(Receive())
}).Should(Succeed())
}()

// set MaxDelaySecondsForPodDeletion to 10sec
cluster.Spec.MaxDelaySecondsForPodDeletion = 10
err = k8sClient.Update(ctx, cluster)
Expect(err).NotTo(HaveOccurred())

// wait for cluster's condition changes
Eventually(func(g Gomega) {
cluster, err = testGetCluster(ctx)
g.Expect(err).NotTo(HaveOccurred())

condHealthy, err := testGetCondition(cluster, mocov1beta2.ConditionHealthy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(condHealthy.Status).To(Equal(metav1.ConditionTrue))
}).Should(Succeed())

// set replication delay to 100sec
primary := cluster.Status.CurrentPrimaryIndex
for i := 0; i < 3; i++ {
if i == primary {
continue
}
of.setSecondsBehindSource(cluster.PodHostname(i), 100)
}

// wait for the pods' annotations are updated
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(primary)}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Annotations).To(HaveKeyWithValue(constants.AnnPreventDelete, "true"))
}).Should(Succeed())

// set replication delay to 0sec
for i := 0; i < 3; i++ {
if i == primary {
continue
}
of.setSecondsBehindSource(cluster.PodHostname(i), 0)
}

// wait for the pods' annotations are updated
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: cluster.PodName(primary)}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Annotations).NotTo(HaveKey(constants.AnnPreventDelete))
}).Should(Succeed())
})
})
12 changes: 12 additions & 0 deletions clustering/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clustering

import (
"context"
"database/sql"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -447,6 +448,12 @@ func (m *mockMySQL) setRetrievedGTIDSet(gtid string) {
m.status.ReplicaStatus.RetrievedGtidSet = gtid
}

func (m *mockMySQL) setSecondsBehindSource(seconds int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.status.ReplicaStatus.SecondsBehindSource = sql.NullInt64{Int64: seconds, Valid: true}
}

type mockOpFactory struct {
orphaned int64

Expand Down Expand Up @@ -548,3 +555,8 @@ func (f *mockOpFactory) getKillConnectionsCount(name string) int {
defer f.mu.Unlock()
return f.countKillConnections[name]
}

func (f *mockOpFactory) setSecondsBehindSource(name string, seconds int64) {
m := f.getInstance(name)
m.setSecondsBehindSource(seconds)
}
32 changes: 32 additions & 0 deletions clustering/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,38 @@ func (p *managerProcess) addRoleLabel(ctx context.Context, ss *StatusSet, noRole
return nil
}

func (p *managerProcess) removeAnnPreventDelete(ctx context.Context, ss *StatusSet) error {
log := logFromContext(ctx)
for _, pod := range ss.Pods {
if _, exists := pod.Annotations[constants.AnnPreventDelete]; exists {
newPod := pod.DeepCopy()
delete(newPod.Annotations, constants.AnnPreventDelete)
log.Info("replication delay resolved, allow pod deletion", "pod", pod.Name)
if err := p.client.Patch(ctx, newPod, client.MergeFrom(pod)); err != nil {
return fmt.Errorf("failed to remove moco.cybozu.com/prevent-delete annotation: %w", err)
}
}
}
return nil
}

func (p *managerProcess) addAnnPreventDelete(ctx context.Context, ss *StatusSet) error {
log := logFromContext(ctx)
ppod := ss.Pods[ss.Primary]
newPod := ppod.DeepCopy()
if newPod.Annotations == nil {
newPod.Annotations = make(map[string]string)
}
if _, exists := newPod.Annotations[constants.AnnPreventDelete]; !exists {
newPod.Annotations[constants.AnnPreventDelete] = "true"
log.Info("replication delay detected, prevent pod deletion", "pod", ppod.Name)
if err := p.client.Patch(ctx, newPod, client.MergeFrom(ppod)); err != nil {
return fmt.Errorf("failed to add moco.cybozu.com/prevent-delete annotation: %w", err)
}
}
return nil
}

func (p *managerProcess) configure(ctx context.Context, ss *StatusSet) (bool, error) {
redo := false

Expand Down
14 changes: 13 additions & 1 deletion clustering/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) {
return false, fmt.Errorf("failed to update status fields in MySQLCluster: %w", err)
}

if ss.PreventPodDeletion {
err := p.addAnnPreventDelete(ctx, ss)
if err != nil {
return false, fmt.Errorf("failed to add annotation to prevent pod deletion: %w", err)
}
} else {
err := p.removeAnnPreventDelete(ctx, ss)
if err != nil {
return false, fmt.Errorf("failed to remove annotation to prevent pod deletion: %w", err)
}
}

logFromContext(ctx).Info("cluster state is " + ss.State.String())
switch ss.State {
case StateOffline:
Expand All @@ -206,7 +218,7 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) {
return false, nil

case StateHealthy, StateDegraded:
if ss.NeedSwitch {
if ss.NeedSwitch && !ss.PreventPodDeletion {
if err := p.switchover(ctx, ss); err != nil {
event.SwitchOverFailed.Emit(ss.Cluster, p.recorder, err)
return false, fmt.Errorf("failed to switchover: %w", err)
Expand Down
28 changes: 25 additions & 3 deletions clustering/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ type StatusSet struct {
Errants []int
Candidates []int

NeedSwitch bool
Candidate int
State ClusterState
NeedSwitch bool
PreventPodDeletion bool
Candidate int
State ClusterState
}

// Close closes `ss.DBOps`.
Expand Down Expand Up @@ -235,6 +236,27 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) {
ss.ExecutedGTID = pst.GlobalVariables.ExecutedGTID
}

// detect replication delay
if cluster.Spec.MaxDelaySecondsForPodDeletion > 0 {
shunki-fujita marked this conversation as resolved.
Show resolved Hide resolved
preventPodDeletion := false
for i, ist := range ss.MySQLStatus {
if i == ss.Primary {
continue
}
if ist == nil {
continue
}
if ist.ReplicaStatus == nil {
continue
}
if ist.ReplicaStatus.SecondsBehindSource.Valid && ist.ReplicaStatus.SecondsBehindSource.Int64 > cluster.Spec.MaxDelaySecondsForPodDeletion {
preventPodDeletion = true
break
}
}
ss.PreventPodDeletion = preventPodDeletion
}

// detect errant replicas
if ss.ExecutedGTID != "" {
pst := ss.MySQLStatus[ss.Primary]
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/moco.cybozu.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ spec:
description: 'MaxDelaySeconds configures the readiness probe of '
minimum: 0
type: integer
maxDelaySecondsForPodDeletion:
default: 0
description: MaxDelaySecondsForPodDeletion configures the maxim
format: int64
minimum: 0
type: integer
mysqlConfigMapName:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ spec:
description: 'MaxDelaySeconds configures the readiness probe of '
minimum: 0
type: integer
maxDelaySecondsForPodDeletion:
default: 0
description: MaxDelaySecondsForPodDeletion configures the maxim
format: int64
minimum: 0
type: integer
mysqlConfigMapName:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
Expand Down
1 change: 1 addition & 0 deletions config/webhook/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
resources:
- manifests.yaml
- service.yaml
- validate_preventdelete.yaml

configurations:
- kustomizeconfig.yaml
27 changes: 27 additions & 0 deletions config/webhook/validate_preventdelete.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicy
metadata:
name: delete-validator
spec:
failurePolicy: Fail
matchConstraints:
resourceRules:
- apiGroups: [""]
apiVersions: ["*"]
operations: ["DELETE"]
resources: ["pods"]
validations:
- expression: |
!has(oldObject.metadata.annotations) ||
!("moco.cybozu.com/prevent-delete" in oldObject.metadata.annotations) ||
!(oldObject.metadata.annotations["moco.cybozu.com/prevent-delete"] == "true")
messageExpression: oldObject.metadata.name + ' is protected from deletion'
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingAdmissionPolicyBinding
metadata:
name: delete-validator
spec:
policyName: moco-delete-validator
validationActions:
- Deny
10 changes: 6 additions & 4 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,14 @@ func (r *MySQLClusterReconciler) reconcileV1MyCnf(ctx context.Context, req ctrl.
return cms.Items[i].CreationTimestamp.Time.After(cms.Items[j].CreationTimestamp.Time)
})

for i, old := range cms.Items {
if i < r.MySQLConfigMapHistoryLimit {
oldMyCnfCount := 0
for _, old := range cms.Items {
if !strings.HasPrefix(old.Name, prefix) || old.Name == cmName {
continue
}

if strings.HasPrefix(old.Name, prefix) && old.Name != cmName {
oldMyCnfCount++
log.Info("found my.cnf configmap", "configMapName", old.Name, "count", oldMyCnfCount, "created", old.CreationTimestamp.Time)
if oldMyCnfCount > r.MySQLConfigMapHistoryLimit-1 {
yoheinbb marked this conversation as resolved.
Show resolved Hide resolved
if err := r.Delete(ctx, &old); err != nil {
return nil, fmt.Errorf("failed to delete old my.cnf configmap %s/%s: %w", old.Namespace, old.Name, err)
}
Expand Down
Loading