Skip to content

Commit

Permalink
Merge pull request #27 from kalbhor/main
Browse files Browse the repository at this point in the history
feat: add error metrics to target and source
  • Loading branch information
knadh authored Jul 26, 2024
2 parents 5fac087 + 9167e66 commit 05fce61
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
11 changes: 10 additions & 1 deletion internal/relay/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@ import (
"github.com/twmb/franz-go/pkg/sasl/scram"
)

const relayMetricPrefix = "kafka_relay_"

var (
RelayMetric = "kafka_relay_msg_count{source=\"%s\", destination=\"%s\", partition=\"%d\"}"
SrcNetworkErrMetric = relayMetricPrefix + "source_errors_total{source=\"%d\", error=\"%s\"}"
SrcsUnhealthyMetric = relayMetricPrefix + "sources_unhealthy_total"
SrcKafkaErrMetric = relayMetricPrefix + "source_kafka_errors_total{source=\"%d\", error=\"%s\"}"
SrcHealthMetric = relayMetricPrefix + "source_highwatermark{source=\"%d\"}"

TargetNetworkErrMetric = relayMetricPrefix + "source_errors_total{source=\"%d\", error=\"%s\"}"
TargetKafkaErrMetric = relayMetricPrefix + "target_kafka_errors_total{error=\"%s\"}"
RelayedMsgsMetric = relayMetricPrefix + "msgs_total{source=\"%s\", destination=\"%s\", partition=\"%d\"}"

ErrLaggingBehind = fmt.Errorf("topic end offset is lagging behind")
)
Expand Down
18 changes: 13 additions & 5 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/VictoriaMetrics/metrics"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)
Expand Down Expand Up @@ -48,10 +49,11 @@ type Server struct {

// SourcePool manages the source Kafka instances and consumption.
type SourcePool struct {
cfg SourcePoolCfg
client *kgo.Client
log *slog.Logger
topics []string
cfg SourcePoolCfg
client *kgo.Client
log *slog.Logger
metrics *metrics.Set
topics []string

offsets map[string]map[int32]kgo.Offset

Expand Down Expand Up @@ -82,7 +84,7 @@ var (

// NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer)
// servers. The pool always attempts to find one healthy node for the relay to consume from.
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, log *slog.Logger) (*SourcePool, error) {
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*SourcePool, error) {
servers := make([]Server, 0, len(serverCfgs))

// Initially mark all servers as unhealthy.
Expand All @@ -105,6 +107,7 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topi
topics: topicNames,
servers: servers,
log: log,
metrics: m,
backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax),
}, nil
}
Expand Down Expand Up @@ -154,6 +157,7 @@ loop:
conn, err := sp.newConn(globalCtx, s)
if err != nil {
retries++
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcNetworkErrMetric, s.ID, "new connection failed")).Inc()
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
continue loop
Expand All @@ -170,6 +174,7 @@ loop:
}

retries++
sp.metrics.GetOrCreateCounter(SrcsUnhealthyMetric).Inc()
sp.log.Error("no healthy server found. waiting and retrying", "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
}
Expand All @@ -183,6 +188,7 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {

// There's no connection.
if fetches.IsClientClosed() {
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcKafkaErrMetric, s.ID, "client closed")).Inc()
sp.log.Debug("retrieving fetches failed. client closed.", "id", s.ID, "broker", s.Config.BootstrapBrokers)
sp.setWeight(s.ID, unhealthyWeight)

Expand All @@ -191,6 +197,7 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {

// If there are errors in the fetches, handle them.
for _, err := range fetches.Errors() {
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcKafkaErrMetric, s.ID, "fetches error")).Inc()
sp.log.Error("found error in fetches", "server", s.ID, "error", err.Err)
sp.setWeight(s.ID, unhealthyWeight)

Expand Down Expand Up @@ -513,6 +520,7 @@ func (sp *SourcePool) setWeight(id int, weight int64) {
sp.curCandidate = s
}

sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcHealthMetric, id)).Set(uint64(weight))
sp.log.Debug("setting candidate weight", "id", id, "weight", weight, "curr", sp.curCandidate)
sp.servers[id] = s
break
Expand Down
7 changes: 6 additions & 1 deletion internal/relay/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (tg *Target) initProducer(top Topics) (*kgo.Client, error) {
} else {
tlsOpt, err := getTLSConfig(tg.pCfg.CACertPath, tg.pCfg.ClientCertPath, tg.pCfg.ClientKeyPath)
if err != nil {
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "tls config error")).Inc()
return nil, err
}

Expand All @@ -188,6 +189,7 @@ outerLoop:
default:
cl, err = kgo.NewClient(opts...)
if err != nil {
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetNetworkErrMetric, "error creating producer client")).Inc()
tg.log.Error("error creating producer client", "error", err)
retries++
waitTries(tg.ctx, backoff(retries))
Expand All @@ -208,6 +210,7 @@ outerLoop:

// Test connectivity and ensure destination topics exists.
if err := testConnection(cl, tg.pCfg.SessionTimeout, topics, partitions); err != nil {
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetNetworkErrMetric, "error connecting to producer")).Inc()
tg.log.Error("error connecting to producer", "err", err)
retries++
waitTries(tg.ctx, backoff(retries))
Expand Down Expand Up @@ -295,13 +298,14 @@ retry:
destTopic = tg.targetTopics[res.Record.Topic]
part = res.Record.Partition
)
tg.metrics.GetOrCreateCounter(fmt.Sprintf(RelayMetric, srcTopic, destTopic, part)).Inc()
tg.metrics.GetOrCreateCounter(fmt.Sprintf(RelayedMsgsMetric, srcTopic, destTopic, part)).Inc()
}

tg.log.Debug("produced last offset", "offset", results[len(results)-1].Record.Offset, "batch", batchLen, "retry", retries)

// retry if there is an error
if err != nil {
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "error producing message")).Inc()
tg.log.Error("error producing message", "err", err, "failed_count", batchLen, "retry", retries)

bufRecs := tg.client.BufferedProduceRecords()
Expand Down Expand Up @@ -333,6 +337,7 @@ retry:
}

if !sent {
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "error producing message after retries")).Inc()
return fmt.Errorf("error producing message; exhausted retries (%v)", tg.pCfg.MaxRetries)
}

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
}

// Initialize the source Kafka (consumer) relay.
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, lo)
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, metr, lo)
if err != nil {
log.Fatalf("error initializing source pool controller: %v", err)
}
Expand Down

0 comments on commit 05fce61

Please sign in to comment.