From ecd9c75c2f3eec69d5cf5cb4e26f56f6c862fa39 Mon Sep 17 00:00:00 2001 From: shaoyue Date: Fri, 10 Nov 2023 18:40:10 +0800 Subject: [PATCH] Support sasl health probe (#38) Signed-off-by: shaoyue.chen --- config/samples/milvus_kafka.yaml | 8 ++ pkg/controllers/conditions.go | 8 +- pkg/controllers/conditions_test.go | 10 +-- pkg/controllers/status_cluster.go | 29 ++++++- pkg/external/kafka.go | 93 +++++++++++++++++++- pkg/external/kafka_test.go | 133 ++++++++++++++++++++++++++++- pkg/external/minio.go | 4 +- 7 files changed, 267 insertions(+), 18 deletions(-) diff --git a/config/samples/milvus_kafka.yaml b/config/samples/milvus_kafka.yaml index 404bfe4b..dfc30d7c 100644 --- a/config/samples/milvus_kafka.yaml +++ b/config/samples/milvus_kafka.yaml @@ -6,6 +6,14 @@ metadata: labels: app: milvus spec: + config: + kafka: + # securityPolicy supports: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL + securityPolicy: PLAINTEXT + # saslMechanisms supports: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 + saslMechanisms: PLAIN + saslUsername: "" + saslPassword: "" dependencies: msgStreamType: kafka kafka: diff --git a/pkg/controllers/conditions.go b/pkg/controllers/conditions.go index 9bc26fa6..1f4ac248 100644 --- a/pkg/controllers/conditions.go +++ b/pkg/controllers/conditions.go @@ -46,8 +46,8 @@ func GetCondition(getter func() v1beta1.MilvusCondition, eps []string) v1beta1.M } var ( - wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka) func() v1beta1.MilvusCondition { - return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p) } + wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) func() v1beta1.MilvusCondition { + return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p, cfg) } } wrapPulsarConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusPulsar) func() v1beta1.MilvusCondition { return func() v1beta1.MilvusCondition { return GetPulsarCondition(ctx, logger, p) } @@ -95,8 +95,8 @@ var msgStreamReadyCondition = v1beta1.MilvusCondition{ var checkKafka = external.CheckKafka -func GetKafkaCondition(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka) v1beta1.MilvusCondition { - err := checkKafka(p.BrokerList) +func GetKafkaCondition(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) v1beta1.MilvusCondition { + err := checkKafka(cfg) if err != nil { return newErrMsgStreamCondResult(v1beta1.ReasonMsgStreamNotReady, err.Error()) } diff --git a/pkg/controllers/conditions_test.go b/pkg/controllers/conditions_test.go index fb336b1b..01a7cd66 100644 --- a/pkg/controllers/conditions_test.go +++ b/pkg/controllers/conditions_test.go @@ -68,7 +68,7 @@ func TestWrapGetters(t *testing.T) { ctx := context.TODO() logger := logf.Log t.Run("kafka", func(t *testing.T) { - fn := wrapKafkaConditonGetter(ctx, logger, v1beta1.MilvusKafka{}) + fn := wrapKafkaConditonGetter(ctx, logger, v1beta1.MilvusKafka{}, external.CheckKafkaConfig{}) fn() }) t.Run("pulsar", func(t *testing.T) { @@ -95,12 +95,12 @@ func getMockPulsarNewClient(cli pulsar.Client, err error) func(options pulsar.Cl } func TestGetKafkaCondition(t *testing.T) { - checkKafka = func([]string) error { return nil } - ret := GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}) + checkKafka = func(external.CheckKafkaConfig) error { return nil } + ret := GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}, external.CheckKafkaConfig{}) assert.Equal(t, corev1.ConditionTrue, ret.Status) - checkKafka = func([]string) error { return errors.New("failed") } - ret = GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}) + checkKafka = func(external.CheckKafkaConfig) error { return errors.New("failed") } + ret = GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}, external.CheckKafkaConfig{}) assert.Equal(t, corev1.ConditionFalse, ret.Status) } diff --git a/pkg/controllers/status_cluster.go b/pkg/controllers/status_cluster.go index 31884df9..4705f33e 100644 --- a/pkg/controllers/status_cluster.go +++ b/pkg/controllers/status_cluster.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" "github.com/milvus-io/milvus-operator/pkg/config" + "github.com/milvus-io/milvus-operator/pkg/external" ) //go:generate mockgen -package=controllers -source=status_cluster.go -destination=status_cluster_mock.go @@ -366,6 +367,23 @@ func (r *MilvusStatusSyncer) GetMilvusEndpoint(ctx context.Context, mc v1beta1.M return GetMilvusEndpoint(ctx, r.logger, r.Client, info) } +// GetKafkaConfFromCR get kafka config from CR +func GetKafkaConfFromCR(mc v1beta1.Milvus) (*external.CheckKafkaConfig, error) { + kafkaConf := new(external.CheckKafkaConfig) + allConf := mc.Spec.Conf + kafkaConfData, exist := allConf.Data["kafka"] + if exist { + kafkaConfValues := v1beta1.Values{ + Data: kafkaConfData.(map[string]interface{}), + } + err := kafkaConfValues.AsObject(kafkaConf) + if err != nil { + return nil, errors.Wrap(err, "decode kafka config failed") + } + } + return kafkaConf, nil +} + func (r *MilvusStatusSyncer) GetMsgStreamCondition( ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error) { var eps = []string{} @@ -375,7 +393,16 @@ func (r *MilvusStatusSyncer) GetMsgStreamCondition( // rocksmq / natsmq is built in, assume ok return msgStreamReadyCondition, nil case v1beta1.MsgStreamTypeKafka: - getter = wrapKafkaConditonGetter(ctx, r.logger, mc.Spec.Dep.Kafka) + kafkaConf, err := GetKafkaConfFromCR(mc) + if err != nil { + return v1beta1.MilvusCondition{ + Type: v1beta1.MsgStreamReady, + Status: corev1.ConditionUnknown, + Message: err.Error(), + }, nil + } + kafkaConf.BrokerList = mc.Spec.Dep.Kafka.BrokerList + getter = wrapKafkaConditonGetter(ctx, r.logger, mc.Spec.Dep.Kafka, *kafkaConf) eps = mc.Spec.Dep.Kafka.BrokerList default: // default pulsar diff --git a/pkg/external/kafka.go b/pkg/external/kafka.go index fdfb2fe7..232ee307 100644 --- a/pkg/external/kafka.go +++ b/pkg/external/kafka.go @@ -2,25 +2,110 @@ package external import ( "context" + "crypto/tls" "time" + "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" "github.com/milvus-io/milvus-operator/pkg/util" "github.com/pkg/errors" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" ) -func CheckKafka(brokerList []string) error { +type CheckKafkaConfig struct { + BrokerList []string `json:"-"` + SecurityProtocol string `json:"securityProtocol"` + SASLMechanisms string `json:"saslMechanisms"` + SASLUsername string `json:"saslUsername"` + SASLPassword string `json:"saslPassword"` +} + +// GetKafkaConfFromCR get check kafka config from CR +func GetKafkaConfFromCR(mc v1beta1.Milvus) (*CheckKafkaConfig, error) { + kafkaConf := &CheckKafkaConfig{} + allConf := mc.Spec.Conf + kafkaConfData, exist := allConf.Data["kafka"] + if exist { + kafkaConfValues := v1beta1.Values{ + Data: kafkaConfData.(map[string]interface{}), + } + err := kafkaConfValues.AsObject(kafkaConf) + if err != nil { + return nil, errors.Wrap(err, "decode kafka config failed") + } + } + return kafkaConf, nil +} + +// GetKafkaDialer returns a kafka.Dialer with tls and sasl configured +func GetKafkaDialer(conf CheckKafkaConfig) (*kafka.Dialer, error) { + useTls := false + useSasl := false + switch conf.SecurityProtocol { + case "SASL_PLAINTEXT": + useSasl = true + case "SASL_SSL": + useTls = true + useSasl = true + case "SSL": + useTls = true + case "PLAINTEXT", "": + default: + return nil, errors.Errorf("unspported security protocol: %s", conf.SecurityProtocol) + } + + var err error + var dialer *kafka.Dialer + var tlsConfig *tls.Config + var saslMechanism sasl.Mechanism + if useTls { + tlsConfig = &tls.Config{} + } + if useSasl { + switch conf.SASLMechanisms { + case "SCRAM-SHA-256": + saslMechanism, err = scram.Mechanism(scram.SHA256, conf.SASLUsername, conf.SASLPassword) + case "SCRAM-SHA-512": + saslMechanism, err = scram.Mechanism(scram.SHA512, conf.SASLUsername, conf.SASLPassword) + case "PLAIN", "": + saslMechanism = &plain.Mechanism{Username: conf.SASLUsername, Password: conf.SASLPassword} + default: + err = errors.Errorf("unspported SASL mechanism: %s", conf.SASLMechanisms) + } + if err != nil { + return nil, err + } + } + dialer = &kafka.Dialer{ + TLS: tlsConfig, + SASLMechanism: saslMechanism, + Timeout: DependencyCheckTimeout, + DualStack: true, + } + return dialer, nil +} + +func CheckKafka(conf CheckKafkaConfig) error { // make a new reader that consumes from _milvus-operator, partition 0, at offset 0 - if len(brokerList) == 0 { + if len(conf.BrokerList) == 0 { return errors.New("broker list is empty") } + + dialer, err := GetKafkaDialer(conf) + if err != nil { + return errors.Wrap(err, "get kafka dialer failed") + } + r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: brokerList, + Dialer: dialer, + Brokers: conf.BrokerList, Topic: "_milvus-operator", }) defer r.Close() var checkKafka = func() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), DependencyCheckTimeout) defer cancel() err := r.SetOffsetAt(ctx, time.Now()) return errors.Wrap(err, "check consume offset from borker failed") diff --git a/pkg/external/kafka_test.go b/pkg/external/kafka_test.go index 49f200bc..dbbd67ee 100644 --- a/pkg/external/kafka_test.go +++ b/pkg/external/kafka_test.go @@ -3,12 +3,139 @@ package external import ( "testing" + "github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1" "github.com/stretchr/testify/assert" ) func TestCheckKafkaFailed(t *testing.T) { - err := CheckKafka([]string{}) - assert.Error(t, err) - err = CheckKafka([]string{"dummy:9092"}) + conf := CheckKafkaConfig{} + var err error + t.Run("no broker list failed", func(t *testing.T) { + err := CheckKafka(conf) + assert.Error(t, err) + }) + + t.Run("probe broker failed", func(t *testing.T) { + conf.BrokerList = []string{"dummy:9092"} + err = CheckKafka(conf) + assert.Error(t, err) + }) + + t.Run("get dialer failed", func(t *testing.T) { + conf.SecurityProtocol = "bad" + err = CheckKafka(conf) + assert.Error(t, err) + }) +} + +func TestGetKafkaDialer(t *testing.T) { + conf := CheckKafkaConfig{} + t.Run("default no tls, no sasl", func(t *testing.T) { + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.Nil(t, dialer.TLS) + assert.Nil(t, dialer.SASLMechanism) + }) + + t.Run("securityProtocol=PLAINTEXT", func(t *testing.T) { + conf.SecurityProtocol = "PLAINTEXT" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.Nil(t, dialer.TLS) + assert.Nil(t, dialer.SASLMechanism) + }) + + t.Run("securityProtocol=SSL", func(t *testing.T) { + conf.SecurityProtocol = "SSL" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.NotNil(t, dialer.TLS) + assert.Nil(t, dialer.SASLMechanism) + }) + + t.Run("securityProtocol=SASL_PLAINTEXT", func(t *testing.T) { + conf.SecurityProtocol = "SASL_PLAINTEXT" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.Nil(t, dialer.TLS) + assert.NotNil(t, dialer.SASLMechanism) + }) + + t.Run("securityProtocol=SASL_SSL", func(t *testing.T) { + conf.SecurityProtocol = "SASL_SSL" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.NotNil(t, dialer.TLS) + assert.NotNil(t, dialer.SASLMechanism) + }) + + t.Run("securityProtocol=notSupport", func(t *testing.T) { + conf.SecurityProtocol = "notSupport" + _, err := GetKafkaDialer(conf) + assert.Error(t, err) + }) + + t.Run("saslMechanism=PLAIN", func(t *testing.T) { + conf.SecurityProtocol = "SASL_SSL" + conf.SASLMechanisms = "PLAIN" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.NotNil(t, dialer.TLS) + assert.Equal(t, "PLAIN", dialer.SASLMechanism.Name()) + }) + + t.Run("saslMechanism=SCRAM-SHA-256", func(t *testing.T) { + conf.SecurityProtocol = "SASL_SSL" + conf.SASLMechanisms = "SCRAM-SHA-256" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.NotNil(t, dialer.TLS) + assert.Equal(t, "SCRAM-SHA-256", dialer.SASLMechanism.Name()) + }) + + t.Run("saslMechanism=SCRAM-SHA-512", func(t *testing.T) { + conf.SecurityProtocol = "SASL_SSL" + conf.SASLMechanisms = "SCRAM-SHA-512" + dialer, err := GetKafkaDialer(conf) + assert.NoError(t, err) + assert.NotNil(t, dialer.TLS) + assert.Equal(t, "SCRAM-SHA-512", dialer.SASLMechanism.Name()) + }) + + t.Run("saslMechanism=notSupport", func(t *testing.T) { + conf.SecurityProtocol = "SASL_SSL" + conf.SASLMechanisms = "notSupport" + _, err := GetKafkaDialer(conf) + assert.Error(t, err) + }) +} + +func TestGetKafkaConfFromCR(t *testing.T) { + mc := v1beta1.Milvus{} + conf, err := GetKafkaConfFromCR(mc) + assert.NoError(t, err) + assert.Equal(t, CheckKafkaConfig{}, *conf) + + mc.Spec.Conf.Data = map[string]interface{}{ + "kafka": map[string]interface{}{ + "securityProtocol": "SASL_PLAINTEXT", + "saslMechanisms": "PLAIN", + "saslUsername": "test", + "saslPassword": "testp", + }, + } + conf, err = GetKafkaConfFromCR(mc) + assert.NoError(t, err) + assert.Equal(t, "SASL_PLAINTEXT", conf.SecurityProtocol) + assert.Equal(t, "PLAIN", conf.SASLMechanisms) + assert.Equal(t, "test", conf.SASLUsername) + assert.Equal(t, "testp", conf.SASLPassword) + + mc.Spec.Conf.Data = map[string]interface{}{ + "kafka": map[string]interface{}{ + "securityProtocol": 1, + }, + } + _, err = GetKafkaConfFromCR(mc) assert.Error(t, err) } diff --git a/pkg/external/minio.go b/pkg/external/minio.go index ed1f09b8..34257120 100644 --- a/pkg/external/minio.go +++ b/pkg/external/minio.go @@ -26,9 +26,11 @@ type CheckMinIOArgs struct { IAMEndpoint string } +var DependencyCheckTimeout = 5 * time.Second + func CheckMinIO(args CheckMinIOArgs) error { var checkMinio = func() error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), DependencyCheckTimeout) defer cancel() switch args.Type {