Skip to content

Commit

Permalink
longer TerminationGracePeriodSeconds
Browse files Browse the repository at this point in the history
Signed-off-by: shaoyue.chen <[email protected]>
  • Loading branch information
haorenfsa committed Jan 17, 2024
1 parent bfcbe1f commit 08b6a68
Show file tree
Hide file tree
Showing 24 changed files with 255 additions and 22 deletions.
1 change: 1 addition & 0 deletions charts/milvus-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ rules:
- ""
resources:
- pods
- pods/exec
verbs:
- create
- delete
Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ rules:
- ""
resources:
- pods
- pods/exec
verbs:
- create
- delete
Expand Down
1 change: 1 addition & 0 deletions deploy/manifests/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14171,6 +14171,7 @@ rules:
- ""
resources:
- pods
- pods/exec
verbs:
- create
- delete
Expand Down
36 changes: 35 additions & 1 deletion pkg/controllers/component_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util/rest"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -171,7 +173,39 @@ var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1
return false, err
}
if len(podList.Items) > 0 {
return false, nil
logger := ctrl.LoggerFrom(ctx)
logger.Info("milvus has pods not stopped", "pods count", len(podList.Items))
return false, ExecKillIfTerminatingTooLong(ctx, podList)
}
return true, nil
}

var gracefulStopTimeout = time.Second * 30

func ExecKillIfTerminatingTooLong(ctx context.Context, podList *corev1.PodList) error {
// we use kubectl exec to kill milvus process, because tini ignore SIGKILL
cli := rest.GetRestClient()
var ret error
for _, pod := range podList.Items {
if pod.DeletionTimestamp == nil {
continue
}
if time.Since(pod.DeletionTimestamp.Time) < gracefulStopTimeout {
continue
}
// kill milvus process
logger := ctrl.LoggerFrom(ctx)
containerName := pod.Labels[AppLabelComponent]
logger.Info("kill milvus process", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName)
stdout, stderr, err := cli.Exec(ctx, pod.Namespace, pod.Name, containerName, []string{"bash", "-c", "pid=$(ps -C milvus -o pid=); kill -9 $pid"})
if err != nil {
logger.Error(err, "kill milvus process err", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName)
ret = err
}
logger.Info("kill milvus output", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "stdout", stdout, "stderr", stderr)
}
if ret != nil {
return errors.Wrap(ret, "failed to kill some milvus pod")
}
return nil
}
41 changes: 41 additions & 0 deletions pkg/controllers/component_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package controllers
import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util/rest"
"github.com/pkg/errors"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -264,3 +266,42 @@ func TestGetComponentErrorDetail(t *testing.T) {
assert.Equal(t, "creating", ret.Deployment.Message)
})
}

func TestExecKillIfTerminatingTooLong(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockRestClient := rest.NewMockRestClient(mockCtrl)
ctx := context.Background()
rest.SetRestClient(mockRestClient)
pods := &corev1.PodList{
Items: []corev1.Pod{{}, {}},
}
t.Run("delete not sent yet", func(t *testing.T) {
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("delete sent, but not timeout", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now()}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now()}
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill ok", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(2)
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill 1 ok,1 error", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", errors.New("test")).Times(1)
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(1)
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion pkg/controllers/deployment_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func updateSomeFieldsOnlyWhenRolling(template *corev1.PodTemplateSpec, updater d
},
}
}
template.Spec.TerminationGracePeriodSeconds = int64Ptr(300)
template.Spec.TerminationGracePeriodSeconds = int64Ptr(1800)
}

func updateSidecars(template *corev1.PodTemplateSpec, updater deploymentUpdater) {
Expand Down
26 changes: 21 additions & 5 deletions pkg/controllers/milvus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -59,7 +60,7 @@ type MilvusReconciler struct {
//+kubebuilder:rbac:groups=apps,resources=statefulsets;deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=pods;secrets;services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=pods;pods/exec,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services;configmaps;secrets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -82,6 +83,8 @@ type MilvusReconciler struct {
func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.statusSyncer.RunIfNot()
globalCommonInfo.InitIfNot(r.Client)
logger := r.logger.WithValues("milvus", req.NamespacedName)
ctx = ctrl.LoggerInto(ctx, logger)
if !config.IsDebug() {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -111,25 +114,38 @@ func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}
} else {
logger.Info("deleteing milvus")
if milvus.Status.Status != milvusv1beta1.StatusDeleting {
if !controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) {
// delete self again with foreground deletion
logger.Info("change background delete to foreground")
if err := r.Delete(ctx, milvus, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
return ctrl.Result{}, err
}
}
milvus.Status.Status = milvusv1beta1.StatusDeleting
if err := r.Status().Update(ctx, milvus); err != nil {
return ctrl.Result{}, err
}
}

if controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) {
stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus)
if !stopped || err != nil {
return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err
stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus)
if !stopped || err != nil {
if err != nil {
logger.Error(err, "deleting milvus: check milvus stopped failed")
} else {
logger.Info("deleting milvus: not all pod stopped, requeue")
}
return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err
}

logger.Info("finalizing milvus")
if controllerutil.ContainsFinalizer(milvus, MilvusFinalizerName) {
if err := Finalize(ctx, r, *milvus); err != nil {
return ctrl.Result{}, err
}
// metrics
logger.Info("deleted milvus")
milvusStatusCollector.DeleteLabelValues(milvus.Namespace, milvus.Name)
controllerutil.RemoveFinalizer(milvus, MilvusFinalizerName)
err := r.Update(ctx, milvus)
Expand Down
12 changes: 3 additions & 9 deletions pkg/controllers/milvus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestClusterReconciler(t *testing.T) {
assert.Error(t, err)
})

t.Run("case delete background", func(t *testing.T) {
t.Run("case delete background, change to foreground failed", func(t *testing.T) {
defer ctrl.Finish()
m.Finalizers = []string{MilvusFinalizerName}
mockCheckMilvusStopRet = false
Expand All @@ -103,16 +103,10 @@ func TestClusterReconciler(t *testing.T) {
mockClient.EXPECT().Status().Return(mockClient)
mockClient.EXPECT().Update(gomock.Any(), gomock.Any()).Times(1)

mockClient.EXPECT().Update(gomock.Any(), gomock.Any()).Do(
func(ctx, obj interface{}, opts ...interface{}) {
// finalizer should be removed
u := obj.(*v1beta1.Milvus)
assert.Equal(t, []string{}, u.Finalizers)
},
).Return(nil)
mockClient.EXPECT().Delete(gomock.Any(), gomock.Any(), client.PropagationPolicy(metav1.DeletePropagationForeground)).Times(1).Return(errMock)

_, err := r.Reconcile(ctx, reconcile.Request{})
assert.NoError(t, err)
assert.Error(t, err)
})

t.Run("delete foreground deletion", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/status_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
Expand Down Expand Up @@ -70,6 +71,7 @@ type MilvusStatusSyncer struct {
}

func NewMilvusStatusSyncer(ctx context.Context, client client.Client, logger logr.Logger) *MilvusStatusSyncer {
ctx = ctrl.LoggerInto(ctx, logger)
return &MilvusStatusSyncer{
ctx: ctx,
Client: client,
Expand Down
1 change: 0 additions & 1 deletion pkg/util/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func NewK8sClientsForConfig(config *rest.Config) (*K8sClients, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to create dynamic client")
}

return &K8sClients{
ClientSet: clientSet,
ExtClientSet: extClientSet,
Expand Down
97 changes: 97 additions & 0 deletions pkg/util/rest/rest_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package rest

import (
"bytes"
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
ctrl "sigs.k8s.io/controller-runtime"
)

//go:generate mockgen -source=./rest_client.go -destination=./rest_client_mock.go -package=rest RestClient

type RestClient interface {
// Exec exec command in pod
Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error)
}

type RestClientImpl struct {
restClient rest.Interface
config *rest.Config
scheme *runtime.Scheme
}

var singletonRestClient RestClient

func GetRestClient() RestClient {
return singletonRestClient
}

// SetRestClient for unit test
func SetRestClient(r RestClient) {
singletonRestClient = r
}

func init() {
config := ctrl.GetConfigOrDie()
restClient, err := newRestClientImpl(config)
if err != nil {
panic(err)
}
singletonRestClient = restClient
}

func newRestClientImpl(config *rest.Config) (*RestClientImpl, error) {
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, errors.Wrap(err, "failed to add corev1 to scheme")
}
config.NegotiatedSerializer = serializer.NewCodecFactory(scheme)
config.GroupVersion = &corev1.SchemeGroupVersion
restClient, err := rest.RESTClientFor(config)
if err != nil {
return nil, errors.Wrap(err, "failed to create rest client")
}

return &RestClientImpl{
restClient: restClient,
config: config,
scheme: scheme,
}, nil
}

func (clis RestClientImpl) Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error) {
req := clis.restClient.Post().
Resource("pods").
Namespace(namespace).
Name(pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
}, runtime.NewParameterCodec(clis.scheme))

exec, err := remotecommand.NewSPDYExecutor(clis.config, "POST", req.URL())
if err != nil {
return "", "", errors.Wrap(err, "failed to create executor")
}

var stdoutBuf, stderrBuf bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return "", "", errors.Wrap(err, "failed to exec command")
}

return stdoutBuf.String(), stderrBuf.String(), nil
}
5 changes: 5 additions & 0 deletions test/hello-milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,8 @@
# Finally, drop the hello_milvus collection
print(fmt.format("Drop collection `hello_milvus`"))
utility.drop_collection("hello_milvus")


# create collection again to test stopping with collection
print(fmt.format("Recreate collection `hello_milvus`"))
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong")
2 changes: 2 additions & 0 deletions test/mc-2.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ spec:
resources:
requests:
memory: 100Mi
persistence:
size: 20Gi
deletionPolicy: Delete
pvcDeletion: true
msgStreamType: kafka
Expand Down
2 changes: 2 additions & 0 deletions test/milvus-2.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ spec:
resources:
requests:
memory: 100Mi
persistence:
size: 20Gi
deletionPolicy: Delete
pvcDeletion: true
2 changes: 2 additions & 0 deletions test/min-mc-alpha.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ spec:
pvcDeletion: true
values:
mode: standalone
persistence:
size: 20Gi
config:
milvus:
log:
Expand Down
Loading

0 comments on commit 08b6a68

Please sign in to comment.