Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer committed Jan 9, 2025
1 parent 0c6f4a0 commit 5ee3e8f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 31 deletions.
13 changes: 9 additions & 4 deletions extensions/impl/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
)

const (
LblRequest = "req"
LblWriteMsgs = "write"
LblMessage = "message"
LblException = "exception"
)

var (
Expand All @@ -32,12 +31,18 @@ var (
Subsystem: "io",
Name: "kafka_count",
Help: "counter of Kafka IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblStatusType, metrics.LblRuleIDType, metrics.LblOpIDType})

KafkaHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
KafkaDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "kafka_duration",
Help: "Historgram of Kafka IO",
Buckets: prometheus.ExponentialBuckets(10, 2, 20), // 10us ~ 5s
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
)

func init() {
prometheus.MustRegister(KafkaCounter)
prometheus.MustRegister(KafkaDurationHist)
}
65 changes: 39 additions & 26 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
Expand All @@ -40,20 +41,28 @@ type KafkaSink struct {
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
LastStats kafkago.WriterStats
}

type kafkaConf struct {
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
WriterConf kafkaWriterConf `json:"writerConf"`

// write config
Compression string `json:"compression"`
}

type kafkaWriterConf struct {
BatchSize int `json:"batchSize"`
BatchTimeout time.Duration `json:"batchTimeout"`
BatchBytes int64 `json:"batchBytes"`
}

func (c *kafkaConf) validate() error {
if c.Topic == "" {
return fmt.Errorf("topic can not be empty")
Expand All @@ -65,10 +74,7 @@ func (c *kafkaConf) validate() error {
}

func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
c := getDefaultKafkaConf()
err := cast.MapToStruct(configs, c)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), castConfErr)
Expand Down Expand Up @@ -149,7 +155,9 @@ func (k *KafkaSink) buildKafkaWriter() error {
AllowAutoTopicCreation: true,
MaxAttempts: k.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: 1,
BatchSize: k.kc.WriterConf.BatchSize,
BatchBytes: k.kc.WriterConf.BatchBytes,
BatchTimeout: k.kc.WriterConf.BatchTimeout,
Transport: &kafkago.Transport{
SASL: k.mechanism,
TLS: k.tlsConfig,
Expand All @@ -176,29 +184,20 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler)

func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Inc()

Check warning on line 187 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L187

Added line #L187 was not covered by tests
}()
msgs, err := k.collect(ctx, item)
if err != nil {
return err
}
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs)))
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
KafkaDurationHist.WithLabelValues(LblWriteMsgs, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))

Check warning on line 195 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L195

Added line #L195 was not covered by tests
}()
return k.writer.WriteMessages(ctx, msgs...)
}

func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
allMsgs := make([]kafkago.Message, 0)
items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool {
msgs, err := k.collect(ctx, tuple)
Expand All @@ -208,13 +207,14 @@ func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleLis
allMsgs = append(allMsgs, msgs...)
return true
})
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
conf.Log.Infof("send kafka cost %v", time.Since(start).String())
KafkaDurationHist.WithLabelValues(LblWriteMsgs, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))

Check warning on line 213 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L212-L213

Added lines #L212 - L213 were not covered by tests
}()
return k.writer.WriteMessages(ctx, allMsgs...)
err = k.writer.WriteMessages(ctx, allMsgs...)
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, metrics.GetStatusValue(err), ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
return err

Check warning on line 217 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L215-L217

Added lines #L215 - L217 were not covered by tests
}

func (k *KafkaSink) collect(ctx api.StreamContext, item api.MessageTuple) ([]kafkago.Message, error) {
Expand Down Expand Up @@ -338,3 +338,16 @@ var (
_ api.TupleCollector = &KafkaSink{}
_ util.PingableConn = &KafkaSink{}
)

func getDefaultKafkaConf() *kafkaConf {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
WriterConf: kafkaWriterConf{
BatchSize: 100,
BatchTimeout: time.Millisecond,
BatchBytes: 1048576,
},
}
return c
}
2 changes: 1 addition & 1 deletion extensions/impl/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, i
ingestError(ctx, err)
continue
}
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, metrics.LblSuccess, ctx.GetRuleId(), ctx.GetOpId()).Inc()

Check warning on line 197 in extensions/impl/kafka/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/source.go#L197

Added line #L197 was not covered by tests
ingest(ctx, msg.Value, nil, timex.GetNow())
}
}
Expand Down
9 changes: 9 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,17 @@ const (
LblRuleStop = "stop"
LblSourceIO = "source"
LblSinkIO = "sink"
LblException = "err"
LblSuccess = "success"
)

func GetStatusValue(err error) string {
if err == nil {
return LblSuccess
}
return LblException

Check warning on line 38 in metrics/metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics/metrics.go#L34-L38

Added lines #L34 - L38 were not covered by tests
}

var (
RuleStatusCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "kuiper",
Expand Down

0 comments on commit 5ee3e8f

Please sign in to comment.