diff --git a/etc/config.toml b/etc/config.toml index 9d564776e..d31152679 100644 --- a/etc/config.toml +++ b/etc/config.toml @@ -174,6 +174,11 @@ MaxIdleConnsPerHost = 100 # Replacement = "$1:80" # TargetLabel = "__address__" +# [[Pushgw.KafkaWriters]] +# Typ = "sync" +# Brokers = ["127.0.0.1:9092"] +# Topic = "test" + [Ibex] Enable = false RPCListen = "0.0.0.0:20090" \ No newline at end of file diff --git a/pushgw/kafka/producer.go b/pushgw/kafka/producer.go new file mode 100644 index 000000000..96a9f09ae --- /dev/null +++ b/pushgw/kafka/producer.go @@ -0,0 +1,126 @@ +package kafka + +import ( + "fmt" + + "github.com/IBM/sarama" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + AsyncProducer = "async" + SyncProducer = "sync" +) + +var ( + KafkaProducerSuccess = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kafka_producer_message_success_total", + Help: "Total number of successful messages sent to Kafka.", + }, + []string{"producer_type"}, + ) + + KafkaProducerError = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kafka_producer_message_error_total", + Help: "Total number of errors encountered while sending messages to Kafka.", + }, + []string{"producer_type"}, + ) +) + +func init() { + prometheus.MustRegister( + KafkaProducerSuccess, + KafkaProducerError, + ) +} + +type ( + Producer interface { + Send(*sarama.ProducerMessage) error + Close() error + } + + AsyncProducerWrapper struct { + asyncProducer sarama.AsyncProducer + stop chan struct{} + } + + SyncProducerWrapper struct { + syncProducer sarama.SyncProducer + stop chan struct{} + } +) + +func New(typ string, brokers []string, config *sarama.Config) (Producer, error) { + stop := make(chan struct{}) + switch typ { + case AsyncProducer: + p, err := sarama.NewAsyncProducer(brokers, config) + if err != nil { + return nil, err + } + apw := &AsyncProducerWrapper{ + asyncProducer: p, + stop: stop, + } + go apw.errorWorker() + go apw.successWorker() + return apw, nil + case SyncProducer: + if !config.Producer.Return.Successes { + config.Producer.Return.Successes = true + } + p, err := sarama.NewSyncProducer(brokers, config) + return &SyncProducerWrapper{syncProducer: p}, err + default: + return nil, fmt.Errorf("unknown producer type: %s", typ) + } +} + +func (p *AsyncProducerWrapper) Send(msg *sarama.ProducerMessage) error { + p.asyncProducer.Input() <- msg + return nil +} + +func (p *AsyncProducerWrapper) Close() error { + close(p.stop) + return p.asyncProducer.Close() +} + +func (p *AsyncProducerWrapper) errorWorker() { + for { + select { + case <-p.asyncProducer.Errors(): + KafkaProducerError.WithLabelValues(AsyncProducer).Inc() + case <-p.stop: + return + } + } +} + +func (p *AsyncProducerWrapper) successWorker() { + for { + select { + case <-p.asyncProducer.Successes(): + KafkaProducerSuccess.WithLabelValues(AsyncProducer).Inc() + case <-p.stop: + return + } + } +} + +func (p *SyncProducerWrapper) Send(msg *sarama.ProducerMessage) error { + _, _, err := p.syncProducer.SendMessage(msg) + if err == nil { + KafkaProducerSuccess.WithLabelValues(SyncProducer).Inc() + } + return err +} + +func (p *SyncProducerWrapper) Close() error { + close(p.stop) + return p.syncProducer.Close() +} diff --git a/pushgw/pconf/conf.go b/pushgw/pconf/conf.go index 5b4243b95..b9f44d743 100644 --- a/pushgw/pconf/conf.go +++ b/pushgw/pconf/conf.go @@ -4,6 +4,7 @@ import ( "log" "regexp" + "github.com/IBM/sarama" "github.com/ccfos/nightingale/v6/pkg/tlsx" "github.com/prometheus/common/model" @@ -21,6 +22,7 @@ type Pushgw struct { DropSample []map[string]string WriterOpt WriterGlobalOpt Writers []WriterOptions + KafkaWriters []KafkaWriterOptions } type WriterGlobalOpt struct { @@ -56,6 +58,15 @@ type WriterOptions struct { tlsx.ClientConfig } +type KafkaWriterOptions struct { + Typ string + Brokers []string + Topic string + *sarama.Config + + WriteRelabels []*RelabelConfig +} + type RelabelConfig struct { SourceLabels model.LabelNames `json:"source_labels"` Separator string `json:"separator"` diff --git a/pushgw/writer/kafka_writer.go b/pushgw/writer/kafka_writer.go new file mode 100644 index 000000000..3aea80793 --- /dev/null +++ b/pushgw/writer/kafka_writer.go @@ -0,0 +1,54 @@ +package writer + +import ( + "time" + + "github.com/IBM/sarama" + "github.com/ccfos/nightingale/v6/pushgw/kafka" + "github.com/ccfos/nightingale/v6/pushgw/pconf" + "github.com/prometheus/prometheus/prompb" + "github.com/toolkits/pkg/logger" +) + +type KafkaWriterType struct { + Opts pconf.KafkaWriterOptions + ForceUseServerTS bool + Client kafka.Producer + RetryCount int + RetryInterval int64 // 单位秒 +} + +func (w KafkaWriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) { + if len(items) == 0 { + return + } + + items = Relabel(items, w.Opts.WriteRelabels) + if len(items) == 0 { + return + } + + data, err := beforeWrite(key, items, w.ForceUseServerTS, ForwardKafkaDuration) + if err != nil { + logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items) + return + } + + for i := 0; i < w.RetryCount; i++ { + err := w.Client.Send(&sarama.ProducerMessage{Topic: w.Opts.Topic, + Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(data)}) + if err == nil { + break + } + + CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(items))) + logger.Warningf("send to kafka got error: %v in %d times, broker: %v, topic: %s", + err, i, w.Opts.Brokers, w.Opts.Topic) + + if i == 0 { + logger.Warning("example timeseries:", items[0].String()) + } + + time.Sleep(time.Duration(w.RetryInterval) * time.Second) + } +} diff --git a/pushgw/writer/relabel.go b/pushgw/writer/relabel.go index 38d9a6b6d..c67b2bbdf 100644 --- a/pushgw/writer/relabel.go +++ b/pushgw/writer/relabel.go @@ -37,6 +37,19 @@ func Process(labels []prompb.Label, cfgs ...*pconf.RelabelConfig) []prompb.Label return labels } +func Relabel(items []prompb.TimeSeries, rc []*pconf.RelabelConfig) []prompb.TimeSeries { + ritems := make([]prompb.TimeSeries, 0, len(items)) + for _, item := range items { + lbls := Process(item.Labels, rc...) + if len(lbls) == 0 { + continue + } + item.Labels = lbls + ritems = append(ritems, item) + } + return ritems +} + func getValue(ls []prompb.Label, name model.LabelName) string { for _, l := range ls { if l.Name == string(name) { diff --git a/pushgw/writer/stats.go b/pushgw/writer/stats.go index 738484eda..9d6e4bff3 100644 --- a/pushgw/writer/stats.go +++ b/pushgw/writer/stats.go @@ -19,6 +19,16 @@ var ( }, []string{"url"}, ) + ForwardKafkaDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Buckets: []float64{.1, 1, 10}, + Name: "forward_kafka_duration_seconds", + Help: "Forward samples to Kafka. latencies in seconds.", + }, []string{"brokers_topic"}, + ) + GaugeSampleQueueSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -67,6 +77,7 @@ var ( func init() { prometheus.MustRegister( ForwardDuration, + ForwardKafkaDuration, CounterWirteTotal, CounterWirteErrorTotal, CounterPushQueueErrorTotal, diff --git a/pushgw/writer/writer.go b/pushgw/writer/writer.go index cd76a199e..0233c6f48 100644 --- a/pushgw/writer/writer.go +++ b/pushgw/writer/writer.go @@ -11,12 +11,15 @@ import ( "sync/atomic" "time" + "github.com/IBM/sarama" "github.com/ccfos/nightingale/v6/pkg/fasttime" + "github.com/ccfos/nightingale/v6/pushgw/kafka" "github.com/ccfos/nightingale/v6/pushgw/pconf" "github.com/golang/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/client_golang/api" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/prompb" "github.com/toolkits/pkg/logger" ) @@ -29,36 +32,14 @@ type WriterType struct { RetryInterval int64 // 单位秒 } -func (w WriterType) writeRelabel(items []prompb.TimeSeries) []prompb.TimeSeries { - ritems := make([]prompb.TimeSeries, 0, len(items)) - for _, item := range items { - lbls := Process(item.Labels, w.Opts.WriteRelabels...) - if len(lbls) == 0 { - continue - } - item.Labels = lbls - ritems = append(ritems, item) - } - return ritems -} - -func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) { - if len(items) == 0 { - return - } - - items = w.writeRelabel(items) - if len(items) == 0 { - return - } - +func beforeWrite(key string, items []prompb.TimeSeries, forceUseServerTS bool, + forwardDuration *prometheus.HistogramVec) ([]byte, error) { CounterWirteTotal.WithLabelValues(key).Add(float64(len(items))) start := time.Now() defer func() { - ForwardDuration.WithLabelValues(key).Observe(time.Since(start).Seconds()) + forwardDuration.WithLabelValues(key).Observe(time.Since(start).Seconds()) }() - - if w.ForceUseServerTS { + if forceUseServerTS { ts := int64(fasttime.UnixTimestamp()) * 1000 for i := 0; i < len(items); i++ { if len(items[i].Samples) == 0 { @@ -72,7 +53,20 @@ func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[ Timeseries: items, } - data, err := proto.Marshal(req) + return proto.Marshal(req) +} + +func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) { + if len(items) == 0 { + return + } + + items = Relabel(items, w.Opts.WriteRelabels) + if len(items) == 0 { + return + } + + data, err := beforeWrite(key, items, w.ForceUseServerTS, ForwardDuration) if err != nil { logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items) return @@ -159,7 +153,7 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error { type WritersType struct { pushgw pconf.Pushgw - backends map[string]WriterType + backends map[string]Writer queues map[string]*IdentQueue AllQueueLen atomic.Value sync.RWMutex @@ -198,7 +192,7 @@ func (ws *WritersType) SetAllQueueLen() { func NewWriters(pushgwConfig pconf.Pushgw) *WritersType { writers := &WritersType{ - backends: make(map[string]WriterType), + backends: make(map[string]Writer), queues: make(map[string]*IdentQueue), pushgw: pushgwConfig, AllQueueLen: atomic.Value{}, @@ -211,7 +205,7 @@ func NewWriters(pushgwConfig pconf.Pushgw) *WritersType { return writers } -func (ws *WritersType) Put(name string, writer WriterType) { +func (ws *WritersType) Put(name string, writer Writer) { ws.backends[name] = writer } @@ -272,6 +266,10 @@ func (ws *WritersType) PushSample(ident string, v interface{}) error { return nil } +type Writer interface { + Write(string, []prompb.TimeSeries, ...map[string]string) +} + func (ws *WritersType) StartConsumer(identQueue *IdentQueue) { for { select { @@ -292,9 +290,18 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) { } func (ws *WritersType) Init() error { - opts := ws.pushgw.Writers ws.AllQueueLen.Store(0) + if err := ws.initWriters(); err != nil { + return err + } + + return ws.initKafkaWriters() +} + +func (ws *WritersType) initWriters() error { + opts := ws.pushgw.Writers + for i := 0; i < len(opts); i++ { tlsConf, err := opts[i].ClientConfig.TLSConfig() if err != nil { @@ -342,3 +349,30 @@ func (ws *WritersType) Init() error { return nil } + +func (ws *WritersType) initKafkaWriters() error { + opts := ws.pushgw.KafkaWriters + + for i := 0; i < len(opts); i++ { + cfg := opts[i].Config + if cfg == nil { + cfg = sarama.NewConfig() + } + producer, err := kafka.New(opts[i].Typ, opts[i].Brokers, cfg) + if err != nil { + logger.Warningf("new kafka producer got error: %v", err) + return err + } + + writer := KafkaWriterType{ + Opts: opts[i], + ForceUseServerTS: ws.pushgw.ForceUseServerTS, + Client: producer, + RetryCount: ws.pushgw.WriterOpt.RetryCount, + RetryInterval: ws.pushgw.WriterOpt.RetryInterval, + } + ws.Put(fmt.Sprintf("%v_%s", opts[i].Brokers, opts[i].Topic), writer) + } + + return nil +}