Skip to content

Commit

Permalink
pushgw transfer time series data to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
lwb0214 committed Jan 25, 2025
1 parent 0a94394 commit 02140e9
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 31 deletions.
5 changes: 5 additions & 0 deletions etc/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ MaxIdleConnsPerHost = 100
# Replacement = "$1:80"
# TargetLabel = "__address__"

# [[Pushgw.KafkaWriters]]
# Typ = "sync"
# Brokers = ["127.0.0.1:9092"]
# Topic = "test"

[Ibex]
Enable = false
RPCListen = "0.0.0.0:20090"
126 changes: 126 additions & 0 deletions pushgw/kafka/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package kafka

import (
"fmt"

"github.com/IBM/sarama"
"github.com/prometheus/client_golang/prometheus"
)

const (
AsyncProducer = "async"
SyncProducer = "sync"
)

var (
KafkaProducerSuccess = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_producer_message_success_total",
Help: "Total number of successful messages sent to Kafka.",
},
[]string{"producer_type"},
)

KafkaProducerError = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_producer_message_error_total",
Help: "Total number of errors encountered while sending messages to Kafka.",
},
[]string{"producer_type"},
)
)

func init() {
prometheus.MustRegister(
KafkaProducerSuccess,
KafkaProducerError,
)
}

type (
Producer interface {
Send(*sarama.ProducerMessage) error
Close() error
}

AsyncProducerWrapper struct {
asyncProducer sarama.AsyncProducer
stop chan struct{}
}

SyncProducerWrapper struct {
syncProducer sarama.SyncProducer
stop chan struct{}
}
)

func New(typ string, brokers []string, config *sarama.Config) (Producer, error) {
stop := make(chan struct{})
switch typ {
case AsyncProducer:
p, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, err
}
apw := &AsyncProducerWrapper{
asyncProducer: p,
stop: stop,
}
go apw.errorWorker()
go apw.successWorker()
return apw, nil
case SyncProducer:
if !config.Producer.Return.Successes {
config.Producer.Return.Successes = true
}
p, err := sarama.NewSyncProducer(brokers, config)
return &SyncProducerWrapper{syncProducer: p}, err
default:
return nil, fmt.Errorf("unknown producer type: %s", typ)
}
}

func (p *AsyncProducerWrapper) Send(msg *sarama.ProducerMessage) error {
p.asyncProducer.Input() <- msg
return nil
}

func (p *AsyncProducerWrapper) Close() error {
close(p.stop)
return p.asyncProducer.Close()
}

func (p *AsyncProducerWrapper) errorWorker() {
for {
select {
case <-p.asyncProducer.Errors():
KafkaProducerError.WithLabelValues(AsyncProducer).Inc()
case <-p.stop:
return
}
}
}

func (p *AsyncProducerWrapper) successWorker() {
for {
select {
case <-p.asyncProducer.Successes():
KafkaProducerSuccess.WithLabelValues(AsyncProducer).Inc()
case <-p.stop:
return
}
}
}

func (p *SyncProducerWrapper) Send(msg *sarama.ProducerMessage) error {
_, _, err := p.syncProducer.SendMessage(msg)
if err == nil {
KafkaProducerSuccess.WithLabelValues(SyncProducer).Inc()
}
return err
}

func (p *SyncProducerWrapper) Close() error {
close(p.stop)
return p.syncProducer.Close()
}
11 changes: 11 additions & 0 deletions pushgw/pconf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"regexp"

"github.com/IBM/sarama"
"github.com/ccfos/nightingale/v6/pkg/tlsx"

"github.com/prometheus/common/model"
Expand All @@ -21,6 +22,7 @@ type Pushgw struct {
DropSample []map[string]string
WriterOpt WriterGlobalOpt
Writers []WriterOptions
KafkaWriters []KafkaWriterOptions
}

type WriterGlobalOpt struct {
Expand Down Expand Up @@ -56,6 +58,15 @@ type WriterOptions struct {
tlsx.ClientConfig
}

type KafkaWriterOptions struct {
Typ string
Brokers []string
Topic string
*sarama.Config

WriteRelabels []*RelabelConfig
}

type RelabelConfig struct {
SourceLabels model.LabelNames `json:"source_labels"`
Separator string `json:"separator"`
Expand Down
54 changes: 54 additions & 0 deletions pushgw/writer/kafka_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package writer

import (
"time"

"github.com/IBM/sarama"
"github.com/ccfos/nightingale/v6/pushgw/kafka"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/logger"
)

type KafkaWriterType struct {
Opts pconf.KafkaWriterOptions
ForceUseServerTS bool
Client kafka.Producer
RetryCount int
RetryInterval int64 // 单位秒
}

func (w KafkaWriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) {
if len(items) == 0 {
return
}

items = Relabel(items, w.Opts.WriteRelabels)
if len(items) == 0 {
return
}

data, err := beforeWrite(key, items, w.ForceUseServerTS, ForwardKafkaDuration)
if err != nil {
logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items)
return
}

for i := 0; i < w.RetryCount; i++ {
err := w.Client.Send(&sarama.ProducerMessage{Topic: w.Opts.Topic,
Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(data)})
if err == nil {
break
}

CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(items)))
logger.Warningf("send to kafka got error: %v in %d times, broker: %v, topic: %s",
err, i, w.Opts.Brokers, w.Opts.Topic)

if i == 0 {
logger.Warning("example timeseries:", items[0].String())
}

time.Sleep(time.Duration(w.RetryInterval) * time.Second)
}
}
13 changes: 13 additions & 0 deletions pushgw/writer/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ func Process(labels []prompb.Label, cfgs ...*pconf.RelabelConfig) []prompb.Label
return labels
}

func Relabel(items []prompb.TimeSeries, rc []*pconf.RelabelConfig) []prompb.TimeSeries {
ritems := make([]prompb.TimeSeries, 0, len(items))
for _, item := range items {
lbls := Process(item.Labels, rc...)
if len(lbls) == 0 {
continue
}
item.Labels = lbls
ritems = append(ritems, item)
}
return ritems
}

func getValue(ls []prompb.Label, name model.LabelName) string {
for _, l := range ls {
if l.Name == string(name) {
Expand Down
11 changes: 11 additions & 0 deletions pushgw/writer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ var (
}, []string{"url"},
)

ForwardKafkaDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.1, 1, 10},
Name: "forward_kafka_duration_seconds",
Help: "Forward samples to Kafka. latencies in seconds.",
}, []string{"brokers_topic"},
)

GaugeSampleQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -67,6 +77,7 @@ var (
func init() {
prometheus.MustRegister(
ForwardDuration,
ForwardKafkaDuration,
CounterWirteTotal,
CounterWirteErrorTotal,
CounterPushQueueErrorTotal,
Expand Down
Loading

0 comments on commit 02140e9

Please sign in to comment.