Skip to content

Commit

Permalink
Support sasl health probe (#38)
Browse files Browse the repository at this point in the history
Signed-off-by: shaoyue.chen <[email protected]>
  • Loading branch information
haorenfsa authored Nov 10, 2023
1 parent ea52e3d commit ecd9c75
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 18 deletions.
8 changes: 8 additions & 0 deletions config/samples/milvus_kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ metadata:
labels:
app: milvus
spec:
config:
kafka:
# securityPolicy supports: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
securityPolicy: PLAINTEXT
# saslMechanisms supports: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
saslMechanisms: PLAIN
saslUsername: ""
saslPassword: ""
dependencies:
msgStreamType: kafka
kafka:
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func GetCondition(getter func() v1beta1.MilvusCondition, eps []string) v1beta1.M
}

var (
wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka) func() v1beta1.MilvusCondition {
return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p) }
wrapKafkaConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) func() v1beta1.MilvusCondition {
return func() v1beta1.MilvusCondition { return GetKafkaCondition(ctx, logger, p, cfg) }
}
wrapPulsarConditonGetter = func(ctx context.Context, logger logr.Logger, p v1beta1.MilvusPulsar) func() v1beta1.MilvusCondition {
return func() v1beta1.MilvusCondition { return GetPulsarCondition(ctx, logger, p) }
Expand Down Expand Up @@ -95,8 +95,8 @@ var msgStreamReadyCondition = v1beta1.MilvusCondition{

var checkKafka = external.CheckKafka

func GetKafkaCondition(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka) v1beta1.MilvusCondition {
err := checkKafka(p.BrokerList)
func GetKafkaCondition(ctx context.Context, logger logr.Logger, p v1beta1.MilvusKafka, cfg external.CheckKafkaConfig) v1beta1.MilvusCondition {
err := checkKafka(cfg)
if err != nil {
return newErrMsgStreamCondResult(v1beta1.ReasonMsgStreamNotReady, err.Error())
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestWrapGetters(t *testing.T) {
ctx := context.TODO()
logger := logf.Log
t.Run("kafka", func(t *testing.T) {
fn := wrapKafkaConditonGetter(ctx, logger, v1beta1.MilvusKafka{})
fn := wrapKafkaConditonGetter(ctx, logger, v1beta1.MilvusKafka{}, external.CheckKafkaConfig{})
fn()
})
t.Run("pulsar", func(t *testing.T) {
Expand All @@ -95,12 +95,12 @@ func getMockPulsarNewClient(cli pulsar.Client, err error) func(options pulsar.Cl
}

func TestGetKafkaCondition(t *testing.T) {
checkKafka = func([]string) error { return nil }
ret := GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{})
checkKafka = func(external.CheckKafkaConfig) error { return nil }
ret := GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}, external.CheckKafkaConfig{})
assert.Equal(t, corev1.ConditionTrue, ret.Status)

checkKafka = func([]string) error { return errors.New("failed") }
ret = GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{})
checkKafka = func(external.CheckKafkaConfig) error { return errors.New("failed") }
ret = GetKafkaCondition(context.TODO(), logf.Log.WithName("test"), v1beta1.MilvusKafka{}, external.CheckKafkaConfig{})
assert.Equal(t, corev1.ConditionFalse, ret.Status)
}

Expand Down
29 changes: 28 additions & 1 deletion pkg/controllers/status_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/config"
"github.com/milvus-io/milvus-operator/pkg/external"
)

//go:generate mockgen -package=controllers -source=status_cluster.go -destination=status_cluster_mock.go
Expand Down Expand Up @@ -366,6 +367,23 @@ func (r *MilvusStatusSyncer) GetMilvusEndpoint(ctx context.Context, mc v1beta1.M
return GetMilvusEndpoint(ctx, r.logger, r.Client, info)
}

// GetKafkaConfFromCR get kafka config from CR
func GetKafkaConfFromCR(mc v1beta1.Milvus) (*external.CheckKafkaConfig, error) {
kafkaConf := new(external.CheckKafkaConfig)
allConf := mc.Spec.Conf
kafkaConfData, exist := allConf.Data["kafka"]
if exist {
kafkaConfValues := v1beta1.Values{
Data: kafkaConfData.(map[string]interface{}),
}
err := kafkaConfValues.AsObject(kafkaConf)
if err != nil {
return nil, errors.Wrap(err, "decode kafka config failed")
}
}
return kafkaConf, nil
}

func (r *MilvusStatusSyncer) GetMsgStreamCondition(
ctx context.Context, mc v1beta1.Milvus) (v1beta1.MilvusCondition, error) {
var eps = []string{}
Expand All @@ -375,7 +393,16 @@ func (r *MilvusStatusSyncer) GetMsgStreamCondition(
// rocksmq / natsmq is built in, assume ok
return msgStreamReadyCondition, nil
case v1beta1.MsgStreamTypeKafka:
getter = wrapKafkaConditonGetter(ctx, r.logger, mc.Spec.Dep.Kafka)
kafkaConf, err := GetKafkaConfFromCR(mc)
if err != nil {
return v1beta1.MilvusCondition{
Type: v1beta1.MsgStreamReady,
Status: corev1.ConditionUnknown,
Message: err.Error(),
}, nil
}
kafkaConf.BrokerList = mc.Spec.Dep.Kafka.BrokerList
getter = wrapKafkaConditonGetter(ctx, r.logger, mc.Spec.Dep.Kafka, *kafkaConf)
eps = mc.Spec.Dep.Kafka.BrokerList
default:
// default pulsar
Expand Down
93 changes: 89 additions & 4 deletions pkg/external/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,110 @@ package external

import (
"context"
"crypto/tls"
"time"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util"
"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

func CheckKafka(brokerList []string) error {
type CheckKafkaConfig struct {
BrokerList []string `json:"-"`
SecurityProtocol string `json:"securityProtocol"`
SASLMechanisms string `json:"saslMechanisms"`
SASLUsername string `json:"saslUsername"`
SASLPassword string `json:"saslPassword"`
}

// GetKafkaConfFromCR get check kafka config from CR
func GetKafkaConfFromCR(mc v1beta1.Milvus) (*CheckKafkaConfig, error) {
kafkaConf := &CheckKafkaConfig{}
allConf := mc.Spec.Conf
kafkaConfData, exist := allConf.Data["kafka"]
if exist {
kafkaConfValues := v1beta1.Values{
Data: kafkaConfData.(map[string]interface{}),
}
err := kafkaConfValues.AsObject(kafkaConf)
if err != nil {
return nil, errors.Wrap(err, "decode kafka config failed")
}
}
return kafkaConf, nil
}

// GetKafkaDialer returns a kafka.Dialer with tls and sasl configured
func GetKafkaDialer(conf CheckKafkaConfig) (*kafka.Dialer, error) {
useTls := false
useSasl := false
switch conf.SecurityProtocol {
case "SASL_PLAINTEXT":
useSasl = true
case "SASL_SSL":
useTls = true
useSasl = true
case "SSL":
useTls = true
case "PLAINTEXT", "":
default:
return nil, errors.Errorf("unspported security protocol: %s", conf.SecurityProtocol)
}

var err error
var dialer *kafka.Dialer
var tlsConfig *tls.Config
var saslMechanism sasl.Mechanism
if useTls {
tlsConfig = &tls.Config{}
}
if useSasl {
switch conf.SASLMechanisms {
case "SCRAM-SHA-256":
saslMechanism, err = scram.Mechanism(scram.SHA256, conf.SASLUsername, conf.SASLPassword)
case "SCRAM-SHA-512":
saslMechanism, err = scram.Mechanism(scram.SHA512, conf.SASLUsername, conf.SASLPassword)
case "PLAIN", "":
saslMechanism = &plain.Mechanism{Username: conf.SASLUsername, Password: conf.SASLPassword}
default:
err = errors.Errorf("unspported SASL mechanism: %s", conf.SASLMechanisms)
}
if err != nil {
return nil, err
}
}
dialer = &kafka.Dialer{
TLS: tlsConfig,
SASLMechanism: saslMechanism,
Timeout: DependencyCheckTimeout,
DualStack: true,
}
return dialer, nil
}

func CheckKafka(conf CheckKafkaConfig) error {
// make a new reader that consumes from _milvus-operator, partition 0, at offset 0
if len(brokerList) == 0 {
if len(conf.BrokerList) == 0 {
return errors.New("broker list is empty")
}

dialer, err := GetKafkaDialer(conf)
if err != nil {
return errors.Wrap(err, "get kafka dialer failed")
}

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokerList,
Dialer: dialer,
Brokers: conf.BrokerList,
Topic: "_milvus-operator",
})
defer r.Close()
var checkKafka = func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), DependencyCheckTimeout)
defer cancel()
err := r.SetOffsetAt(ctx, time.Now())
return errors.Wrap(err, "check consume offset from borker failed")
Expand Down
133 changes: 130 additions & 3 deletions pkg/external/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,139 @@ package external
import (
"testing"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/stretchr/testify/assert"
)

func TestCheckKafkaFailed(t *testing.T) {
err := CheckKafka([]string{})
assert.Error(t, err)
err = CheckKafka([]string{"dummy:9092"})
conf := CheckKafkaConfig{}
var err error
t.Run("no broker list failed", func(t *testing.T) {
err := CheckKafka(conf)
assert.Error(t, err)
})

t.Run("probe broker failed", func(t *testing.T) {
conf.BrokerList = []string{"dummy:9092"}
err = CheckKafka(conf)
assert.Error(t, err)
})

t.Run("get dialer failed", func(t *testing.T) {
conf.SecurityProtocol = "bad"
err = CheckKafka(conf)
assert.Error(t, err)
})
}

func TestGetKafkaDialer(t *testing.T) {
conf := CheckKafkaConfig{}
t.Run("default no tls, no sasl", func(t *testing.T) {
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.Nil(t, dialer.TLS)
assert.Nil(t, dialer.SASLMechanism)
})

t.Run("securityProtocol=PLAINTEXT", func(t *testing.T) {
conf.SecurityProtocol = "PLAINTEXT"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.Nil(t, dialer.TLS)
assert.Nil(t, dialer.SASLMechanism)
})

t.Run("securityProtocol=SSL", func(t *testing.T) {
conf.SecurityProtocol = "SSL"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.Nil(t, dialer.SASLMechanism)
})

t.Run("securityProtocol=SASL_PLAINTEXT", func(t *testing.T) {
conf.SecurityProtocol = "SASL_PLAINTEXT"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.Nil(t, dialer.TLS)
assert.NotNil(t, dialer.SASLMechanism)
})

t.Run("securityProtocol=SASL_SSL", func(t *testing.T) {
conf.SecurityProtocol = "SASL_SSL"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.NotNil(t, dialer.SASLMechanism)
})

t.Run("securityProtocol=notSupport", func(t *testing.T) {
conf.SecurityProtocol = "notSupport"
_, err := GetKafkaDialer(conf)
assert.Error(t, err)
})

t.Run("saslMechanism=PLAIN", func(t *testing.T) {
conf.SecurityProtocol = "SASL_SSL"
conf.SASLMechanisms = "PLAIN"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.Equal(t, "PLAIN", dialer.SASLMechanism.Name())
})

t.Run("saslMechanism=SCRAM-SHA-256", func(t *testing.T) {
conf.SecurityProtocol = "SASL_SSL"
conf.SASLMechanisms = "SCRAM-SHA-256"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.Equal(t, "SCRAM-SHA-256", dialer.SASLMechanism.Name())
})

t.Run("saslMechanism=SCRAM-SHA-512", func(t *testing.T) {
conf.SecurityProtocol = "SASL_SSL"
conf.SASLMechanisms = "SCRAM-SHA-512"
dialer, err := GetKafkaDialer(conf)
assert.NoError(t, err)
assert.NotNil(t, dialer.TLS)
assert.Equal(t, "SCRAM-SHA-512", dialer.SASLMechanism.Name())
})

t.Run("saslMechanism=notSupport", func(t *testing.T) {
conf.SecurityProtocol = "SASL_SSL"
conf.SASLMechanisms = "notSupport"
_, err := GetKafkaDialer(conf)
assert.Error(t, err)
})
}

func TestGetKafkaConfFromCR(t *testing.T) {
mc := v1beta1.Milvus{}
conf, err := GetKafkaConfFromCR(mc)
assert.NoError(t, err)
assert.Equal(t, CheckKafkaConfig{}, *conf)

mc.Spec.Conf.Data = map[string]interface{}{
"kafka": map[string]interface{}{
"securityProtocol": "SASL_PLAINTEXT",
"saslMechanisms": "PLAIN",
"saslUsername": "test",
"saslPassword": "testp",
},
}
conf, err = GetKafkaConfFromCR(mc)
assert.NoError(t, err)
assert.Equal(t, "SASL_PLAINTEXT", conf.SecurityProtocol)
assert.Equal(t, "PLAIN", conf.SASLMechanisms)
assert.Equal(t, "test", conf.SASLUsername)
assert.Equal(t, "testp", conf.SASLPassword)

mc.Spec.Conf.Data = map[string]interface{}{
"kafka": map[string]interface{}{
"securityProtocol": 1,
},
}
_, err = GetKafkaConfFromCR(mc)
assert.Error(t, err)
}
4 changes: 3 additions & 1 deletion pkg/external/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ type CheckMinIOArgs struct {
IAMEndpoint string
}

var DependencyCheckTimeout = 5 * time.Second

func CheckMinIO(args CheckMinIOArgs) error {
var checkMinio = func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), DependencyCheckTimeout)
defer cancel()

switch args.Type {
Expand Down

0 comments on commit ecd9c75

Please sign in to comment.