Skip to content

Commit

Permalink
Target Allocator Support for Telegraf Based Prometheus Receiver (#1394)
Browse files Browse the repository at this point in the history
  • Loading branch information
okankoAMZ authored Nov 14, 2024
1 parent b979421 commit 1e78609
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 34 deletions.
59 changes: 59 additions & 0 deletions plugins/inputs/prometheus/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
package prometheus

import (
"os"
"path/filepath"
"testing"

kitlog "github.com/go-kit/log"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,3 +115,57 @@ func Test_metricAppender_Commit(t *testing.T) {
}
assert.Equal(t, expected, *pmb[0])
}

func Test_loadConfigFromFileWithTargetAllocator(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
defer os.Unsetenv("POD_NAME")
configFile := filepath.Join("testdata", "target_allocator.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
logLevel := promlog.AllowedLevel{}
logLevel.Set("DEBUG")
var reloadHandler = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloadHandler)
assert.NoError(t, err)
assert.True(t, taManager.enabled)
assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1")
assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH)

}

func Test_loadConfigFromFileWithoutTargetAllocator(t *testing.T) {
os.Setenv("POD_NAME", "collector-1")
defer os.Unsetenv("POD_NAME")
configFile := filepath.Join("testdata", "base-k8.yaml")
logLevel := promlog.AllowedLevel{}
logLevel.Set("DEBUG")
logger := kitlog.NewLogfmtLogger(os.Stdout)
var reloadHandler = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloadHandler)
assert.NoError(t, err)
assert.False(t, taManager.enabled)

}
func Test_loadConfigFromFileEC2(t *testing.T) {
configFile := filepath.Join("testdata", "base-k8.yaml")
logger := kitlog.NewLogfmtLogger(os.Stdout)
logLevel := promlog.AllowedLevel{}
logLevel.Set("DEBUG")
var reloadHandler = func(cfg *config.Config) error {
logger.Log("reloaded")
return nil
}

taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
err := reloadConfig(configFile, logger, taManager, reloadHandler)
assert.NoError(t, err)
assert.False(t, taManager.enabled)

}
104 changes: 70 additions & 34 deletions plugins/inputs/prometheus/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package prometheus

import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -101,8 +102,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
cfg.configFile = configFilePath

logger := promlog.New(&cfg.promlogConfig)
//stdlog.SetOutput(log.NewStdlibAdapter(logger))
//stdlog.Println("redirect std log")

klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))

Expand All @@ -116,8 +115,19 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
ctxScrape, cancelScrape = context.WithCancel(context.Background())
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)

scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape)
)

level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
//Setup Target Allocator Scrape Post Process Handler
taManager.AttachReloadConfigHandler(
func(prometheusConfig *config.Config) {
relabelScrapeConfigs(prometheusConfig, logger)
},
)

mth.SetScrapeManager(scrapeManager)

var reloaders = []func(cfg *config.Config) error{
Expand Down Expand Up @@ -151,7 +161,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
close(reloadReady.C)
})
}

var g run.Group
{
// Termination handler.
Expand Down Expand Up @@ -179,12 +188,13 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
// Scrape discovery manager.
g.Add(
func() error {
level.Info(logger).Log("msg", "Scrape discovery manager starting")
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
cancelScrape()
},
)
Expand All @@ -201,17 +211,35 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

level.Info(logger).Log("msg", "start discovery")
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
scrapeManager.Stop()
},
)
}
{
// Target Allocator manager.
if taManager.enabled {
g.Add(
func() error {
// we wait until the config is fully loaded.
level.Info(logger).Log("msg", "start ta manager")
err := taManager.Run()
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
taManager.Shutdown()
},
)
}
}
{
// Reload handler.

Expand All @@ -227,7 +255,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}

Expand Down Expand Up @@ -257,9 +285,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan

default:
}

if taManager.enabled {
<-taManager.taReadyCh
}
level.Info(logger).Log("msg", "handling config file")
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
level.Info(logger).Log("msg", "finish handling config file")
Expand Down Expand Up @@ -288,30 +318,11 @@ const (
savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config
)

func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}

func relabelScrapeConfigs(prometheusConfig *config.Config, logger log.Logger) {
// For saving name before relabel
// - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190
// - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193
for _, sc := range conf.ScrapeConfigs {
for _, sc := range prometheusConfig.ScrapeConfigs {
relabelConfigs := []*relabel.Config{
// job
{
Expand All @@ -331,13 +342,38 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
},
}

level.Info(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel")
level.Debug(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel")

sc.RelabelConfigs = append(relabelConfigs, sc.RelabelConfigs...)
sc.MetricRelabelConfigs = append(metricNameRelabelConfigs, sc.MetricRelabelConfigs...)

}
}
func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
content, _ := os.ReadFile(filename)
text := string(content)
level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()
// Check for TA
var conf *config.Config
if taManager.enabled {
level.Info(logger).Log("msg", "Target Allocator is enabled")
conf = (*config.Config)(taManager.config.PrometheusConfig)
} else {
conf, err = config.LoadFile(filename, false, false, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}
}
relabelScrapeConfigs(conf, logger)
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
Expand Down
Loading

0 comments on commit 1e78609

Please sign in to comment.