From 986fdcccc88b76a7655cd5ffa23bd949569a8734 Mon Sep 17 00:00:00 2001 From: Selsabil Mabrouk Date: Thu, 14 Dec 2023 17:17:13 +0100 Subject: [PATCH] Add filter on topic name for metric kafka_topic_log_dir_size_total_bytes --- minion/config_log_dirs.go | 25 +++++++++++++++++++++++++ minion/service.go | 14 ++++++++++---- minion/utils.go | 18 ++++++++++++++++++ prometheus/collect_log_dirs.go | 3 +++ 4 files changed, 56 insertions(+), 4 deletions(-) diff --git a/minion/config_log_dirs.go b/minion/config_log_dirs.go index 6da208b..e461bb4 100644 --- a/minion/config_log_dirs.go +++ b/minion/config_log_dirs.go @@ -1,17 +1,42 @@ package minion +import ( + "fmt" +) + type LogDirsConfig struct { // Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior // to version 1.0.0 as describing log dirs was not supported back then. Enabled bool `koanf:"enabled"` + // AllowedTopics are regex strings of topic names whose topic metrics that shall be exported. + AllowedTopics []string `koanf:"allowedTopics"` + + // IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics + // take precedence over allowed topics. + IgnoredTopics []string `koanf:"ignoredTopics"` } // Validate if provided LogDirsConfig is valid. func (c *LogDirsConfig) Validate() error { + // Check whether each provided string is valid regex + for _, topic := range c.AllowedTopics { + _, err := compileRegex(topic) + if err != nil { + return fmt.Errorf("allowed topic string '%v' is not valid regex", topic) + } + } + + for _, topic := range c.IgnoredTopics { + _, err := compileRegex(topic) + if err != nil { + return fmt.Errorf("ignored topic string '%v' is not valid regex", topic) + } + } return nil } // SetDefaults for topic config func (c *LogDirsConfig) SetDefaults() { c.Enabled = true + c.AllowedTopics = []string{"/.*/"} } diff --git a/minion/service.go b/minion/service.go index 84efae9..e8922e4 100644 --- a/minion/service.go +++ b/minion/service.go @@ -27,10 +27,12 @@ type Service struct { cache map[string]interface{} cacheLock sync.RWMutex - AllowedGroupIDsExpr []*regexp.Regexp - IgnoredGroupIDsExpr []*regexp.Regexp - AllowedTopicsExpr []*regexp.Regexp - IgnoredTopicsExpr []*regexp.Regexp + AllowedGroupIDsExpr []*regexp.Regexp + IgnoredGroupIDsExpr []*regexp.Regexp + AllowedTopicsExpr []*regexp.Regexp + IgnoredTopicsExpr []*regexp.Regexp + AllowedlogDirsTopicsExpr []*regexp.Regexp + IgnoredlogDirsTopicsExpr []*regexp.Regexp client *kgo.Client storage *Storage @@ -67,6 +69,8 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics ignoredGroupIDsExpr, _ := compileRegexes(cfg.ConsumerGroups.IgnoredGroupIDs) allowedTopicsExpr, _ := compileRegexes(cfg.Topics.AllowedTopics) ignoredTopicsExpr, _ := compileRegexes(cfg.Topics.IgnoredTopics) + allowedlogDirsTopicsExpr, _ := compileRegexes(cfg.LogDirs.AllowedTopics) + ignoredlogDirsTopicsExpr, _ := compileRegexes(cfg.LogDirs.IgnoredTopics) service := &Service{ Cfg: cfg, @@ -80,6 +84,8 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics IgnoredGroupIDsExpr: ignoredGroupIDsExpr, AllowedTopicsExpr: allowedTopicsExpr, IgnoredTopicsExpr: ignoredTopicsExpr, + AllowedlogDirsTopicsExpr: allowedlogDirsTopicsExpr, + IgnoredlogDirsTopicsExpr: ignoredlogDirsTopicsExpr, client: client, storage: storage, diff --git a/minion/utils.go b/minion/utils.go index 3049b8f..b18eeeb 100644 --- a/minion/utils.go +++ b/minion/utils.go @@ -42,6 +42,24 @@ func (s *Service) IsTopicAllowed(topicName string) bool { return isAllowed } +func (s *Service) IsLOgDirsTopicAllowed(topicName string) bool { + isAllowed := false + for _, regex := range s.AllowedlogDirsTopicsExpr { + if regex.MatchString(topicName) { + isAllowed = true + break + } + } + + for _, regex := range s.IgnoredlogDirsTopicsExpr { + if regex.MatchString(topicName) { + isAllowed = false + break + } + } + return isAllowed +} + func compileRegex(expr string) (*regexp.Regexp, error) { if strings.HasPrefix(expr, "/") && strings.HasSuffix(expr, "/") { substr := expr[1 : len(expr)-1] diff --git a/prometheus/collect_log_dirs.go b/prometheus/collect_log_dirs.go index 7794f9c..7a7774c 100644 --- a/prometheus/collect_log_dirs.go +++ b/prometheus/collect_log_dirs.go @@ -39,6 +39,9 @@ func (e *Exporter) collectLogDirs(ctx context.Context, ch chan<- prometheus.Metr continue } for _, topic := range dir.Topics { + if !e.minionSvc.IsLOgDirsTopicAllowed(topic.Topic) { + continue + } topicSize := int64(0) for _, partition := range topic.Partitions { topicSize += partition.Size