diff --git a/charts/milvus-operator/templates/clusterrole.yaml b/charts/milvus-operator/templates/clusterrole.yaml index 5907fa30..185d26d6 100644 --- a/charts/milvus-operator/templates/clusterrole.yaml +++ b/charts/milvus-operator/templates/clusterrole.yaml @@ -49,6 +49,7 @@ rules: - "" resources: - pods + - pods/exec verbs: - create - delete diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 12127330..51005b53 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -49,6 +49,7 @@ rules: - "" resources: - pods + - pods/exec verbs: - create - delete diff --git a/deploy/manifests/deployment.yaml b/deploy/manifests/deployment.yaml index 6b19ba62..52bfc8b3 100644 --- a/deploy/manifests/deployment.yaml +++ b/deploy/manifests/deployment.yaml @@ -14171,6 +14171,7 @@ rules: - "" resources: - pods + - pods/exec verbs: - create - delete diff --git a/pkg/controllers/component_condition.go b/pkg/controllers/component_condition.go index ba4ea7e4..2399e376 100644 --- a/pkg/controllers/component_condition.go +++ b/pkg/controllers/component_condition.go @@ -3,8 +3,10 @@ package controllers import ( "context" "fmt" + "time" "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" + "github.com/milvus-io/milvus-operator/pkg/util/rest" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -171,7 +173,39 @@ var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1 return false, err } if len(podList.Items) > 0 { - return false, nil + logger := ctrl.LoggerFrom(ctx) + logger.Info("milvus has pods not stopped", "pods count", len(podList.Items)) + return false, ExecKillIfTerminatingTooLong(ctx, podList) } return true, nil } + +var gracefulStopTimeout = time.Second * 30 + +func ExecKillIfTerminatingTooLong(ctx context.Context, podList *corev1.PodList) error { + // we use kubectl exec to kill milvus process, because tini ignore SIGKILL + cli := rest.GetRestClient() + var ret error + for _, pod := range podList.Items { + if pod.DeletionTimestamp == nil { + continue + } + if time.Since(pod.DeletionTimestamp.Time) < gracefulStopTimeout { + continue + } + // kill milvus process + logger := ctrl.LoggerFrom(ctx) + containerName := pod.Labels[AppLabelComponent] + logger.Info("kill milvus process", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName) + stdout, stderr, err := cli.Exec(ctx, pod.Namespace, pod.Name, containerName, []string{"bash", "-c", "pid=$(ps -C milvus -o pid=); kill -9 $pid"}) + if err != nil { + logger.Error(err, "kill milvus process err", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName) + ret = err + } + logger.Info("kill milvus output", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "stdout", stdout, "stderr", stderr) + } + if ret != nil { + return errors.Wrap(ret, "failed to kill some milvus pod") + } + return nil +} diff --git a/pkg/controllers/component_condition_test.go b/pkg/controllers/component_condition_test.go index 49f46f98..68b3a54e 100644 --- a/pkg/controllers/component_condition_test.go +++ b/pkg/controllers/component_condition_test.go @@ -3,9 +3,11 @@ package controllers import ( "context" "testing" + "time" "github.com/golang/mock/gomock" "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" + "github.com/milvus-io/milvus-operator/pkg/util/rest" "github.com/pkg/errors" "github.com/prashantv/gostub" "github.com/stretchr/testify/assert" @@ -264,3 +266,42 @@ func TestGetComponentErrorDetail(t *testing.T) { assert.Equal(t, "creating", ret.Deployment.Message) }) } + +func TestExecKillIfTerminatingTooLong(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + mockRestClient := rest.NewMockRestClient(mockCtrl) + ctx := context.Background() + rest.SetRestClient(mockRestClient) + pods := &corev1.PodList{ + Items: []corev1.Pod{{}, {}}, + } + t.Run("delete not sent yet", func(t *testing.T) { + err := ExecKillIfTerminatingTooLong(ctx, pods) + assert.NoError(t, err) + }) + + t.Run("delete sent, but not timeout", func(t *testing.T) { + pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now()} + pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now()} + err := ExecKillIfTerminatingTooLong(ctx, pods) + assert.NoError(t, err) + }) + + t.Run("kill ok", func(t *testing.T) { + pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} + pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} + mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(2) + err := ExecKillIfTerminatingTooLong(ctx, pods) + assert.NoError(t, err) + }) + + t.Run("kill 1 ok,1 error", func(t *testing.T) { + pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} + pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} + mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", errors.New("test")).Times(1) + mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(1) + err := ExecKillIfTerminatingTooLong(ctx, pods) + assert.Error(t, err) + }) +} diff --git a/pkg/controllers/deployment_updater.go b/pkg/controllers/deployment_updater.go index f963b826..80b45518 100644 --- a/pkg/controllers/deployment_updater.go +++ b/pkg/controllers/deployment_updater.go @@ -243,7 +243,7 @@ func updateSomeFieldsOnlyWhenRolling(template *corev1.PodTemplateSpec, updater d }, } } - template.Spec.TerminationGracePeriodSeconds = int64Ptr(300) + template.Spec.TerminationGracePeriodSeconds = int64Ptr(1800) } func updateSidecars(template *corev1.PodTemplateSpec, updater deploymentUpdater) { diff --git a/pkg/controllers/milvus_controller.go b/pkg/controllers/milvus_controller.go index fd80a234..e0890831 100644 --- a/pkg/controllers/milvus_controller.go +++ b/pkg/controllers/milvus_controller.go @@ -22,6 +22,7 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -59,7 +60,7 @@ type MilvusReconciler struct { //+kubebuilder:rbac:groups=apps,resources=statefulsets;deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=pods;secrets;services,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=pods;pods/exec,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=services;configmaps;secrets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete @@ -82,6 +83,8 @@ type MilvusReconciler struct { func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.statusSyncer.RunIfNot() globalCommonInfo.InitIfNot(r.Client) + logger := r.logger.WithValues("milvus", req.NamespacedName) + ctx = ctrl.LoggerInto(ctx, logger) if !config.IsDebug() { defer func() { if err := recover(); err != nil { @@ -111,25 +114,38 @@ func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } } } else { + logger.Info("deleteing milvus") if milvus.Status.Status != milvusv1beta1.StatusDeleting { + if !controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) { + // delete self again with foreground deletion + logger.Info("change background delete to foreground") + if err := r.Delete(ctx, milvus, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil { + return ctrl.Result{}, err + } + } milvus.Status.Status = milvusv1beta1.StatusDeleting if err := r.Status().Update(ctx, milvus); err != nil { return ctrl.Result{}, err } } - if controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) { - stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus) - if !stopped || err != nil { - return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err + stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus) + if !stopped || err != nil { + if err != nil { + logger.Error(err, "deleting milvus: check milvus stopped failed") + } else { + logger.Info("deleting milvus: not all pod stopped, requeue") } + return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err } + logger.Info("finalizing milvus") if controllerutil.ContainsFinalizer(milvus, MilvusFinalizerName) { if err := Finalize(ctx, r, *milvus); err != nil { return ctrl.Result{}, err } // metrics + logger.Info("deleted milvus") milvusStatusCollector.DeleteLabelValues(milvus.Namespace, milvus.Name) controllerutil.RemoveFinalizer(milvus, MilvusFinalizerName) err := r.Update(ctx, milvus) diff --git a/pkg/controllers/milvus_controller_test.go b/pkg/controllers/milvus_controller_test.go index a7bbd40f..de3c5e37 100644 --- a/pkg/controllers/milvus_controller_test.go +++ b/pkg/controllers/milvus_controller_test.go @@ -87,7 +87,7 @@ func TestClusterReconciler(t *testing.T) { assert.Error(t, err) }) - t.Run("case delete background", func(t *testing.T) { + t.Run("case delete background, change to foreground failed", func(t *testing.T) { defer ctrl.Finish() m.Finalizers = []string{MilvusFinalizerName} mockCheckMilvusStopRet = false @@ -103,16 +103,10 @@ func TestClusterReconciler(t *testing.T) { mockClient.EXPECT().Status().Return(mockClient) mockClient.EXPECT().Update(gomock.Any(), gomock.Any()).Times(1) - mockClient.EXPECT().Update(gomock.Any(), gomock.Any()).Do( - func(ctx, obj interface{}, opts ...interface{}) { - // finalizer should be removed - u := obj.(*v1beta1.Milvus) - assert.Equal(t, []string{}, u.Finalizers) - }, - ).Return(nil) + mockClient.EXPECT().Delete(gomock.Any(), gomock.Any(), client.PropagationPolicy(metav1.DeletePropagationForeground)).Times(1).Return(errMock) _, err := r.Reconcile(ctx, reconcile.Request{}) - assert.NoError(t, err) + assert.Error(t, err) }) t.Run("delete foreground deletion", func(t *testing.T) { diff --git a/pkg/controllers/status_cluster.go b/pkg/controllers/status_cluster.go index 4705f33e..37c24b2a 100644 --- a/pkg/controllers/status_cluster.go +++ b/pkg/controllers/status_cluster.go @@ -16,6 +16,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" @@ -70,6 +71,7 @@ type MilvusStatusSyncer struct { } func NewMilvusStatusSyncer(ctx context.Context, client client.Client, logger logr.Logger) *MilvusStatusSyncer { + ctx = ctrl.LoggerInto(ctx, logger) return &MilvusStatusSyncer{ ctx: ctx, Client: client, diff --git a/pkg/util/k8s_client.go b/pkg/util/k8s_client.go index 2a99195f..db96ed5c 100644 --- a/pkg/util/k8s_client.go +++ b/pkg/util/k8s_client.go @@ -65,7 +65,6 @@ func NewK8sClientsForConfig(config *rest.Config) (*K8sClients, error) { if err != nil { return nil, errors.Wrap(err, "failed to create dynamic client") } - return &K8sClients{ ClientSet: clientSet, ExtClientSet: extClientSet, diff --git a/pkg/util/rest/rest_client.go b/pkg/util/rest/rest_client.go new file mode 100644 index 00000000..4172ddd9 --- /dev/null +++ b/pkg/util/rest/rest_client.go @@ -0,0 +1,97 @@ +package rest + +import ( + "bytes" + "context" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + ctrl "sigs.k8s.io/controller-runtime" +) + +//go:generate mockgen -source=./rest_client.go -destination=./rest_client_mock.go -package=rest RestClient + +type RestClient interface { + // Exec exec command in pod + Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error) +} + +type RestClientImpl struct { + restClient rest.Interface + config *rest.Config + scheme *runtime.Scheme +} + +var singletonRestClient RestClient + +func GetRestClient() RestClient { + return singletonRestClient +} + +// SetRestClient for unit test +func SetRestClient(r RestClient) { + singletonRestClient = r +} + +func init() { + config := ctrl.GetConfigOrDie() + restClient, err := newRestClientImpl(config) + if err != nil { + panic(err) + } + singletonRestClient = restClient +} + +func newRestClientImpl(config *rest.Config) (*RestClientImpl, error) { + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return nil, errors.Wrap(err, "failed to add corev1 to scheme") + } + config.NegotiatedSerializer = serializer.NewCodecFactory(scheme) + config.GroupVersion = &corev1.SchemeGroupVersion + restClient, err := rest.RESTClientFor(config) + if err != nil { + return nil, errors.Wrap(err, "failed to create rest client") + } + + return &RestClientImpl{ + restClient: restClient, + config: config, + scheme: scheme, + }, nil +} + +func (clis RestClientImpl) Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error) { + req := clis.restClient.Post(). + Resource("pods"). + Namespace(namespace). + Name(pod). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: container, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + }, runtime.NewParameterCodec(clis.scheme)) + + exec, err := remotecommand.NewSPDYExecutor(clis.config, "POST", req.URL()) + if err != nil { + return "", "", errors.Wrap(err, "failed to create executor") + } + + var stdoutBuf, stderrBuf bytes.Buffer + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: &stdoutBuf, + Stderr: &stderrBuf, + }) + if err != nil { + return "", "", errors.Wrap(err, "failed to exec command") + } + + return stdoutBuf.String(), stderrBuf.String(), nil +} diff --git a/test/hello-milvus.py b/test/hello-milvus.py index cabdb2b8..b6ed19a9 100644 --- a/test/hello-milvus.py +++ b/test/hello-milvus.py @@ -169,3 +169,8 @@ # Finally, drop the hello_milvus collection print(fmt.format("Drop collection `hello_milvus`")) utility.drop_collection("hello_milvus") + + +# create collection again to test stopping with collection +print(fmt.format("Recreate collection `hello_milvus`")) +hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong") diff --git a/test/mc-2.1.yaml b/test/mc-2.1.yaml index 29d0708e..318b8667 100644 --- a/test/mc-2.1.yaml +++ b/test/mc-2.1.yaml @@ -24,6 +24,8 @@ spec: resources: requests: memory: 100Mi + persistence: + size: 20Gi deletionPolicy: Delete pvcDeletion: true msgStreamType: kafka diff --git a/test/milvus-2.1.yaml b/test/milvus-2.1.yaml index 70109774..a7064f01 100644 --- a/test/milvus-2.1.yaml +++ b/test/milvus-2.1.yaml @@ -21,5 +21,7 @@ spec: resources: requests: memory: 100Mi + persistence: + size: 20Gi deletionPolicy: Delete pvcDeletion: true diff --git a/test/min-mc-alpha.yaml b/test/min-mc-alpha.yaml index d973534a..c1a6a12f 100644 --- a/test/min-mc-alpha.yaml +++ b/test/min-mc-alpha.yaml @@ -131,6 +131,8 @@ spec: pvcDeletion: true values: mode: standalone + persistence: + size: 20Gi config: milvus: log: diff --git a/test/min-mc-feature.yaml b/test/min-mc-feature.yaml index 8a61569a..e92c34db 100644 --- a/test/min-mc-feature.yaml +++ b/test/min-mc-feature.yaml @@ -45,6 +45,8 @@ spec: pvcDeletion: true values: mode: standalone + persistence: + size: 20Gi config: rootCoord: enableActiveStandby: true diff --git a/test/min-mc-kafka.yaml b/test/min-mc-kafka.yaml index e752d95d..f95c2ff2 100644 --- a/test/min-mc-kafka.yaml +++ b/test/min-mc-kafka.yaml @@ -49,6 +49,8 @@ spec: pvcDeletion: true values: mode: standalone + persistence: + size: 20Gi config: milvus: log: diff --git a/test/min-mc-mixture.yaml b/test/min-mc-mixture.yaml index 38a33bad..43983402 100644 --- a/test/min-mc-mixture.yaml +++ b/test/min-mc-mixture.yaml @@ -134,6 +134,8 @@ spec: pvcDeletion: true values: mode: standalone + persistence: + size: 20Gi config: milvus: log: diff --git a/test/min-mc.yaml b/test/min-mc.yaml index 246f9fc4..952c8462 100644 --- a/test/min-mc.yaml +++ b/test/min-mc.yaml @@ -132,6 +132,8 @@ spec: pvcDeletion: true values: mode: standalone + persistence: + size: 20Gi config: milvus: log: diff --git a/test/min-milvus-alpha.yaml b/test/min-milvus-alpha.yaml index f2ffbf1a..e31b227b 100644 --- a/test/min-milvus-alpha.yaml +++ b/test/min-milvus-alpha.yaml @@ -36,6 +36,16 @@ spec: pvcDeletion: true values: mode: standalone + persistence: + size: 20Gi + rocksmq: + persistence: + enabled: true + persistentVolumeClaim: + spec: + resources: + limits: + storage: 20Gi persistence: enabled: true config: diff --git a/test/min-milvus-feature.yaml b/test/min-milvus-feature.yaml index dbf79f48..f33b0334 100644 --- a/test/min-milvus-feature.yaml +++ b/test/min-milvus-feature.yaml @@ -34,6 +34,16 @@ spec: resources: requests: memory: 100Mi + persistence: + size: 20Gi + rocksmq: + persistence: + enabled: true + persistentVolumeClaim: + spec: + resources: + limits: + storage: 20Gi config: rootCoord: enableActiveStandby: true diff --git a/test/min-milvus-kafka.yaml b/test/min-milvus-kafka.yaml index bbed3d18..986d3d38 100644 --- a/test/min-milvus-kafka.yaml +++ b/test/min-milvus-kafka.yaml @@ -50,9 +50,8 @@ spec: pvcDeletion: true values: mode: standalone - rocksmq: - persistence: - enabled: true + persistence: + size: 20Gi config: milvus: log: diff --git a/test/min-milvus.yaml b/test/min-milvus.yaml index 8839b0a6..3928c96b 100644 --- a/test/min-milvus.yaml +++ b/test/min-milvus.yaml @@ -42,9 +42,16 @@ spec: resources: requests: memory: 100Mi + persistence: + size: 20Gi rocksmq: persistence: enabled: true + persistentVolumeClaim: + spec: + resources: + limits: + storage: 20Gi config: milvus: log: diff --git a/test/sit.sh b/test/sit.sh index b1127099..f83593bd 100755 --- a/test/sit.sh +++ b/test/sit.sh @@ -56,7 +56,8 @@ check_milvus_available(){ delete_milvus_cluster(){ # Delete CR log "Deleting MilvusCluster ..." - kubectl delete -f $mcManifest + kubectl -n mc-sit delete milvus milvus --timeout=300s + kubectl delete ns mc-sit log "Checking PVC deleted ..." kubectl wait --timeout=1m pvc -n mc-sit --for=delete -l release=mc-sit-minio kubectl wait --timeout=1m pvc -n mc-sit --for=delete -l release=mc-sit-$msgStream @@ -105,7 +106,8 @@ case_create_delete_cluster(){ delete_milvus(){ # Delete CR log "Deleting Milvus ..." - kubectl delete -f $milvusManifest + kubectl -n milvus-sit delete milvus milvus --timeout=300s + kubectl delete ns milvus-sit log "Checking PVC deleted ..." kubectl wait --timeout=1m pvc -n milvus-sit --for=delete -l release=milvus-sit-minio kubectl wait --timeout=1m pvc -n milvus-sit --for=delete -l app.kubernetes.io/instance=milvus-sit-etcd