Skip to content

Commit

Permalink
fix rocksmq config (#219)
Browse files Browse the repository at this point in the history
Signed-off-by: haorenfsa <[email protected]>
  • Loading branch information
haorenfsa authored Dec 6, 2024
1 parent ab99941 commit fb5c64d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 35 deletions.
4 changes: 3 additions & 1 deletion pkg/controllers/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func (r *MilvusReconciler) updateConfigMap(ctx context.Context, mc v1beta1.Milvu
delete(conf, "rocksmq")
case v1beta1.MsgStreamTypeRocksMQ:
// adhoc: to let the merger know we're using rocksmq config
conf["rocksmq"] = map[string]interface{}{}
if conf["rocksmq"] == nil {
conf["rocksmq"] = map[string]interface{}{}
}
// delete other mq config to make milvus use rocksmq
delete(conf, "pulsar")
delete(conf, "kafka")
Expand Down
92 changes: 58 additions & 34 deletions pkg/controllers/configmaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"fmt"
"testing"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
)
Expand Down Expand Up @@ -82,37 +82,37 @@ func TestReconcileConfigMaps_Existed(t *testing.T) {
mockClient.EXPECT().
Update(gomock.Any(), gomock.Any()).Return(nil),
)
})

err := r.ReconcileConfigMaps(ctx, mc)
assert.NoError(t, err)
mockClient.EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.ConfigMap{})).
DoAndReturn(func(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...any) error {
cm := obj.(*corev1.ConfigMap)
cm.Name = "cm1"
cm.Namespace = "ns"
err := mockClient.Get(ctx, client.ObjectKeyFromObject(cm), cm)
if err != nil {
return err
}
if len(cm.Data) == 0 {
return errors.New("expect data in configmap")
}
if _, ok := cm.Data["hook.yaml"]; !ok {
return errors.New("expect hook.yaml as key in data")
}
expected, err := yaml.Marshal(map[string]string{
"x": "y",
err := r.ReconcileConfigMaps(ctx, mc)
assert.NoError(t, err)
mockClient.EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.ConfigMap{})).
DoAndReturn(func(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...any) error {
cm := obj.(*corev1.ConfigMap)
cm.Name = "cm1"
cm.Namespace = "ns"
err := mockClient.Get(ctx, client.ObjectKeyFromObject(cm), cm)
if err != nil {
return err
}
if len(cm.Data) == 0 {
return errors.New("expect data in configmap")
}
if _, ok := cm.Data["hook.yaml"]; !ok {
return errors.New("expect hook.yaml as key in data")
}
expected, err := yaml.Marshal(map[string]string{
"x": "y",
})
if err != nil {
return err
}
if cm.Data["hook.yaml"] != string(expected) {
return fmt.Errorf("content not match, expected: %s, got: %s", cm.Data["hook.yaml"], string(expected))
}
return nil
})
if err != nil {
return err
}
if cm.Data["hook.yaml"] != string(expected) {
return fmt.Errorf("content not match, expected: %s, got: %s", cm.Data["hook.yaml"], string(expected))
}
return nil
})
})

t.Run("not call client.Update if configmap not changed", func(t *testing.T) {
gomock.InOrder(
Expand All @@ -130,7 +130,7 @@ func TestReconcileConfigMaps_Existed(t *testing.T) {
Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})).
Return(k8sErrors.NewNotFound(schema.GroupResource{}, "mockErr")).Times(2),
)
err = r.ReconcileConfigMaps(ctx, mc)
err := r.ReconcileConfigMaps(ctx, mc)
assert.NoError(t, err)
})

Expand All @@ -150,7 +150,31 @@ func TestReconcileConfigMaps_Existed(t *testing.T) {
Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})).
Return(k8sErrors.NewNotFound(schema.GroupResource{}, "mockErr")).Times(2),
)
err = r.ReconcileConfigMaps(ctx, mc)
err := r.ReconcileConfigMaps(ctx, mc)
assert.NoError(t, err)
})

t.Run("rocksmq config not overwrited", func(t *testing.T) {
mc.Spec.Dep.MsgStreamType = v1beta1.MsgStreamTypeRocksMQ
mc.Spec.Conf.Data = map[string]interface{}{
"rocksmq": map[string]interface{}{
"a": "b",
"c": "d",
},
}
cm := &corev1.ConfigMap{}
cm.Namespace = "ns"
cm.Name = "cm1"
// get secret of minio
mockClient.EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})).
Return(k8sErrors.NewNotFound(schema.GroupResource{}, "mockErr")).Times(1)

err := r.updateConfigMap(ctx, mc, cm)
assert.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"a": "b",
"c": "d",
}, mc.Spec.Conf.Data["rocksmq"])
})
}

0 comments on commit fb5c64d

Please sign in to comment.