From fb5c64d27ba594431e10dd9b19f64d09259d985c Mon Sep 17 00:00:00 2001 From: shaoyue Date: Fri, 6 Dec 2024 14:24:32 +0800 Subject: [PATCH] fix rocksmq config (#219) Signed-off-by: haorenfsa --- pkg/controllers/configmaps.go | 4 +- pkg/controllers/configmaps_test.go | 92 +++++++++++++++++++----------- 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/pkg/controllers/configmaps.go b/pkg/controllers/configmaps.go index 17305e1..22af14a 100644 --- a/pkg/controllers/configmaps.go +++ b/pkg/controllers/configmaps.go @@ -68,7 +68,9 @@ func (r *MilvusReconciler) updateConfigMap(ctx context.Context, mc v1beta1.Milvu delete(conf, "rocksmq") case v1beta1.MsgStreamTypeRocksMQ: // adhoc: to let the merger know we're using rocksmq config - conf["rocksmq"] = map[string]interface{}{} + if conf["rocksmq"] == nil { + conf["rocksmq"] = map[string]interface{}{} + } // delete other mq config to make milvus use rocksmq delete(conf, "pulsar") delete(conf, "kafka") diff --git a/pkg/controllers/configmaps_test.go b/pkg/controllers/configmaps_test.go index e7029d0..855fbdd 100644 --- a/pkg/controllers/configmaps_test.go +++ b/pkg/controllers/configmaps_test.go @@ -6,12 +6,12 @@ import ( "fmt" "testing" + "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" - - "github.com/stretchr/testify/assert" - "go.uber.org/mock/gomock" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" ) @@ -82,37 +82,37 @@ func TestReconcileConfigMaps_Existed(t *testing.T) { mockClient.EXPECT(). Update(gomock.Any(), gomock.Any()).Return(nil), ) - }) - err := r.ReconcileConfigMaps(ctx, mc) - assert.NoError(t, err) - mockClient.EXPECT(). - Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.ConfigMap{})). - DoAndReturn(func(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...any) error { - cm := obj.(*corev1.ConfigMap) - cm.Name = "cm1" - cm.Namespace = "ns" - err := mockClient.Get(ctx, client.ObjectKeyFromObject(cm), cm) - if err != nil { - return err - } - if len(cm.Data) == 0 { - return errors.New("expect data in configmap") - } - if _, ok := cm.Data["hook.yaml"]; !ok { - return errors.New("expect hook.yaml as key in data") - } - expected, err := yaml.Marshal(map[string]string{ - "x": "y", + err := r.ReconcileConfigMaps(ctx, mc) + assert.NoError(t, err) + mockClient.EXPECT(). + Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.ConfigMap{})). + DoAndReturn(func(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...any) error { + cm := obj.(*corev1.ConfigMap) + cm.Name = "cm1" + cm.Namespace = "ns" + err := mockClient.Get(ctx, client.ObjectKeyFromObject(cm), cm) + if err != nil { + return err + } + if len(cm.Data) == 0 { + return errors.New("expect data in configmap") + } + if _, ok := cm.Data["hook.yaml"]; !ok { + return errors.New("expect hook.yaml as key in data") + } + expected, err := yaml.Marshal(map[string]string{ + "x": "y", + }) + if err != nil { + return err + } + if cm.Data["hook.yaml"] != string(expected) { + return fmt.Errorf("content not match, expected: %s, got: %s", cm.Data["hook.yaml"], string(expected)) + } + return nil }) - if err != nil { - return err - } - if cm.Data["hook.yaml"] != string(expected) { - return fmt.Errorf("content not match, expected: %s, got: %s", cm.Data["hook.yaml"], string(expected)) - } - return nil - }) + }) t.Run("not call client.Update if configmap not changed", func(t *testing.T) { gomock.InOrder( @@ -130,7 +130,7 @@ func TestReconcileConfigMaps_Existed(t *testing.T) { Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). Return(k8sErrors.NewNotFound(schema.GroupResource{}, "mockErr")).Times(2), ) - err = r.ReconcileConfigMaps(ctx, mc) + err := r.ReconcileConfigMaps(ctx, mc) assert.NoError(t, err) }) @@ -150,7 +150,31 @@ func TestReconcileConfigMaps_Existed(t *testing.T) { Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). Return(k8sErrors.NewNotFound(schema.GroupResource{}, "mockErr")).Times(2), ) - err = r.ReconcileConfigMaps(ctx, mc) + err := r.ReconcileConfigMaps(ctx, mc) + assert.NoError(t, err) + }) + + t.Run("rocksmq config not overwrited", func(t *testing.T) { + mc.Spec.Dep.MsgStreamType = v1beta1.MsgStreamTypeRocksMQ + mc.Spec.Conf.Data = map[string]interface{}{ + "rocksmq": map[string]interface{}{ + "a": "b", + "c": "d", + }, + } + cm := &corev1.ConfigMap{} + cm.Namespace = "ns" + cm.Name = "cm1" + // get secret of minio + mockClient.EXPECT(). + Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). + Return(k8sErrors.NewNotFound(schema.GroupResource{}, "mockErr")).Times(1) + + err := r.updateConfigMap(ctx, mc, cm) assert.NoError(t, err) + assert.Equal(t, map[string]interface{}{ + "a": "b", + "c": "d", + }, mc.Spec.Conf.Data["rocksmq"]) }) }