diff --git a/internal/relay/common.go b/internal/relay/common.go index 17902f7..dd2fd8b 100644 --- a/internal/relay/common.go +++ b/internal/relay/common.go @@ -20,8 +20,17 @@ import ( "github.com/twmb/franz-go/pkg/sasl/scram" ) +const relayMetricPrefix = "kafka_relay_" + var ( - RelayMetric = "kafka_relay_msg_count{source=\"%s\", destination=\"%s\", partition=\"%d\"}" + SrcNetworkErrMetric = relayMetricPrefix + "source_errors_total{source=\"%d\", error=\"%s\"}" + SrcsUnhealthyMetric = relayMetricPrefix + "sources_unhealthy_total" + SrcKafkaErrMetric = relayMetricPrefix + "source_kafka_errors_total{source=\"%d\", error=\"%s\"}" + SrcHealthMetric = relayMetricPrefix + "source_highwatermark{source=\"%d\"}" + + TargetNetworkErrMetric = relayMetricPrefix + "source_errors_total{source=\"%d\", error=\"%s\"}" + TargetKafkaErrMetric = relayMetricPrefix + "target_kafka_errors_total{error=\"%s\"}" + RelayedMsgsMetric = relayMetricPrefix + "msgs_total{source=\"%s\", destination=\"%s\", partition=\"%d\"}" ErrLaggingBehind = fmt.Errorf("topic end offset is lagging behind") ) diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index 0ae9810..0f197bf 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/metrics" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" ) @@ -48,10 +49,11 @@ type Server struct { // SourcePool manages the source Kafka instances and consumption. type SourcePool struct { - cfg SourcePoolCfg - client *kgo.Client - log *slog.Logger - topics []string + cfg SourcePoolCfg + client *kgo.Client + log *slog.Logger + metrics *metrics.Set + topics []string offsets map[string]map[int32]kgo.Offset @@ -82,7 +84,7 @@ var ( // NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) // servers. The pool always attempts to find one healthy node for the relay to consume from. -func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, log *slog.Logger) (*SourcePool, error) { +func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*SourcePool, error) { servers := make([]Server, 0, len(serverCfgs)) // Initially mark all servers as unhealthy. @@ -105,6 +107,7 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topi topics: topicNames, servers: servers, log: log, + metrics: m, backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax), }, nil } @@ -154,6 +157,7 @@ loop: conn, err := sp.newConn(globalCtx, s) if err != nil { retries++ + sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcNetworkErrMetric, s.ID, "new connection failed")).Inc() sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries) waitTries(globalCtx, sp.backoffFn(retries)) continue loop @@ -170,6 +174,7 @@ loop: } retries++ + sp.metrics.GetOrCreateCounter(SrcsUnhealthyMetric).Inc() sp.log.Error("no healthy server found. waiting and retrying", "retries", retries) waitTries(globalCtx, sp.backoffFn(retries)) } @@ -183,6 +188,7 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) { // There's no connection. if fetches.IsClientClosed() { + sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcKafkaErrMetric, s.ID, "client closed")).Inc() sp.log.Debug("retrieving fetches failed. client closed.", "id", s.ID, "broker", s.Config.BootstrapBrokers) sp.setWeight(s.ID, unhealthyWeight) @@ -191,6 +197,7 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) { // If there are errors in the fetches, handle them. for _, err := range fetches.Errors() { + sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcKafkaErrMetric, s.ID, "fetches error")).Inc() sp.log.Error("found error in fetches", "server", s.ID, "error", err.Err) sp.setWeight(s.ID, unhealthyWeight) @@ -513,6 +520,7 @@ func (sp *SourcePool) setWeight(id int, weight int64) { sp.curCandidate = s } + sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcHealthMetric, id)).Set(uint64(weight)) sp.log.Debug("setting candidate weight", "id", id, "weight", weight, "curr", sp.curCandidate) sp.servers[id] = s break diff --git a/internal/relay/target.go b/internal/relay/target.go index 43041a3..2ad4396 100644 --- a/internal/relay/target.go +++ b/internal/relay/target.go @@ -164,6 +164,7 @@ func (tg *Target) initProducer(top Topics) (*kgo.Client, error) { } else { tlsOpt, err := getTLSConfig(tg.pCfg.CACertPath, tg.pCfg.ClientCertPath, tg.pCfg.ClientKeyPath) if err != nil { + tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "tls config error")).Inc() return nil, err } @@ -188,6 +189,7 @@ outerLoop: default: cl, err = kgo.NewClient(opts...) if err != nil { + tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetNetworkErrMetric, "error creating producer client")).Inc() tg.log.Error("error creating producer client", "error", err) retries++ waitTries(tg.ctx, backoff(retries)) @@ -208,6 +210,7 @@ outerLoop: // Test connectivity and ensure destination topics exists. if err := testConnection(cl, tg.pCfg.SessionTimeout, topics, partitions); err != nil { + tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetNetworkErrMetric, "error connecting to producer")).Inc() tg.log.Error("error connecting to producer", "err", err) retries++ waitTries(tg.ctx, backoff(retries)) @@ -295,13 +298,14 @@ retry: destTopic = tg.targetTopics[res.Record.Topic] part = res.Record.Partition ) - tg.metrics.GetOrCreateCounter(fmt.Sprintf(RelayMetric, srcTopic, destTopic, part)).Inc() + tg.metrics.GetOrCreateCounter(fmt.Sprintf(RelayedMsgsMetric, srcTopic, destTopic, part)).Inc() } tg.log.Debug("produced last offset", "offset", results[len(results)-1].Record.Offset, "batch", batchLen, "retry", retries) // retry if there is an error if err != nil { + tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "error producing message")).Inc() tg.log.Error("error producing message", "err", err, "failed_count", batchLen, "retry", retries) bufRecs := tg.client.BufferedProduceRecords() @@ -333,6 +337,7 @@ retry: } if !sent { + tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "error producing message after retries")).Inc() return fmt.Errorf("error producing message; exhausted retries (%v)", tg.pCfg.MaxRetries) } diff --git a/main.go b/main.go index 6a435b1..b0f9418 100644 --- a/main.go +++ b/main.go @@ -64,7 +64,7 @@ func main() { } // Initialize the source Kafka (consumer) relay. - srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, lo) + srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, metr, lo) if err != nil { log.Fatalf("error initializing source pool controller: %v", err) }