From 1e7860938548be03d93c46fe848c03c0e21385ea Mon Sep 17 00:00:00 2001 From: okankoAMZ <107267850+okankoAMZ@users.noreply.github.com> Date: Thu, 14 Nov 2024 09:14:54 -0800 Subject: [PATCH] Target Allocator Support for Telegraf Based Prometheus Receiver (#1394) --- .../prometheus/metrics_receiver_test.go | 59 +++++ plugins/inputs/prometheus/start.go | 104 ++++++--- plugins/inputs/prometheus/target_allocator.go | 204 ++++++++++++++++++ .../inputs/prometheus/testdata/base-k8.yaml | 11 + .../prometheus/testdata/target_allocator.yaml | 7 + .../sampleConfig/prometheus_ta_config.yaml | 12 ++ translator/tocwconfig/tocwconfig_test.go | 23 ++ 7 files changed, 386 insertions(+), 34 deletions(-) create mode 100644 plugins/inputs/prometheus/target_allocator.go create mode 100644 plugins/inputs/prometheus/testdata/base-k8.yaml create mode 100644 plugins/inputs/prometheus/testdata/target_allocator.yaml create mode 100644 translator/tocwconfig/sampleConfig/prometheus_ta_config.yaml diff --git a/plugins/inputs/prometheus/metrics_receiver_test.go b/plugins/inputs/prometheus/metrics_receiver_test.go index 6a813c1a2f..8865798cb8 100644 --- a/plugins/inputs/prometheus/metrics_receiver_test.go +++ b/plugins/inputs/prometheus/metrics_receiver_test.go @@ -4,8 +4,13 @@ package prometheus import ( + "os" + "path/filepath" "testing" + kitlog "github.com/go-kit/log" + "github.com/prometheus/common/promlog" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" @@ -110,3 +115,57 @@ func Test_metricAppender_Commit(t *testing.T) { } assert.Equal(t, expected, *pmb[0]) } + +func Test_loadConfigFromFileWithTargetAllocator(t *testing.T) { + os.Setenv("POD_NAME", "collector-1") + defer os.Unsetenv("POD_NAME") + configFile := filepath.Join("testdata", "target_allocator.yaml") + logger := kitlog.NewLogfmtLogger(os.Stdout) + logLevel := promlog.AllowedLevel{} + logLevel.Set("DEBUG") + var reloadHandler = func(cfg *config.Config) error { + logger.Log("reloaded") + return nil + } + taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil) + err := reloadConfig(configFile, logger, taManager, reloadHandler) + assert.NoError(t, err) + assert.True(t, taManager.enabled) + assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1") + assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH) + +} + +func Test_loadConfigFromFileWithoutTargetAllocator(t *testing.T) { + os.Setenv("POD_NAME", "collector-1") + defer os.Unsetenv("POD_NAME") + configFile := filepath.Join("testdata", "base-k8.yaml") + logLevel := promlog.AllowedLevel{} + logLevel.Set("DEBUG") + logger := kitlog.NewLogfmtLogger(os.Stdout) + var reloadHandler = func(cfg *config.Config) error { + logger.Log("reloaded") + return nil + } + taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil) + err := reloadConfig(configFile, logger, taManager, reloadHandler) + assert.NoError(t, err) + assert.False(t, taManager.enabled) + +} +func Test_loadConfigFromFileEC2(t *testing.T) { + configFile := filepath.Join("testdata", "base-k8.yaml") + logger := kitlog.NewLogfmtLogger(os.Stdout) + logLevel := promlog.AllowedLevel{} + logLevel.Set("DEBUG") + var reloadHandler = func(cfg *config.Config) error { + logger.Log("reloaded") + return nil + } + + taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil) + err := reloadConfig(configFile, logger, taManager, reloadHandler) + assert.NoError(t, err) + assert.False(t, taManager.enabled) + +} diff --git a/plugins/inputs/prometheus/start.go b/plugins/inputs/prometheus/start.go index d5b81e1fe5..00af39f50f 100644 --- a/plugins/inputs/prometheus/start.go +++ b/plugins/inputs/prometheus/start.go @@ -21,6 +21,7 @@ package prometheus import ( "context" + "fmt" "os" "os/signal" "runtime" @@ -101,8 +102,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan cfg.configFile = configFilePath logger := promlog.New(&cfg.promlogConfig) - //stdlog.SetOutput(log.NewStdlibAdapter(logger)) - //stdlog.Println("redirect std log") klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6)) @@ -116,8 +115,19 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan ctxScrape, cancelScrape = context.WithCancel(context.Background()) sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer) discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape")) - scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer) + + scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer) + taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape) + ) + + level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled)) + //Setup Target Allocator Scrape Post Process Handler + taManager.AttachReloadConfigHandler( + func(prometheusConfig *config.Config) { + relabelScrapeConfigs(prometheusConfig, logger) + }, ) + mth.SetScrapeManager(scrapeManager) var reloaders = []func(cfg *config.Config) error{ @@ -151,7 +161,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan close(reloadReady.C) }) } - var g run.Group { // Termination handler. @@ -179,12 +188,13 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan // Scrape discovery manager. g.Add( func() error { + level.Info(logger).Log("msg", "Scrape discovery manager starting") err := discoveryManagerScrape.Run() - level.Info(logger).Log("msg", "Scrape discovery manager stopped") + level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err) return err }, func(err error) { - level.Info(logger).Log("msg", "Stopping scrape discovery manager...") + level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err) cancelScrape() }, ) @@ -201,17 +211,35 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan level.Info(logger).Log("msg", "start discovery") err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) - level.Info(logger).Log("msg", "Scrape manager stopped") + level.Info(logger).Log("msg", "Scrape manager stopped", "error", err) return err }, func(err error) { // Scrape manager needs to be stopped before closing the local TSDB // so that it doesn't try to write samples to a closed storage. - level.Info(logger).Log("msg", "Stopping scrape manager...") + level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err) scrapeManager.Stop() }, ) } + { + // Target Allocator manager. + if taManager.enabled { + g.Add( + func() error { + // we wait until the config is fully loaded. + level.Info(logger).Log("msg", "start ta manager") + err := taManager.Run() + level.Info(logger).Log("msg", "ta manager stopped", "error", err) + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping ta manager...", "error", err) + taManager.Shutdown() + }, + ) + } + } { // Reload handler. @@ -227,7 +255,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan for { select { case <-hup: - if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } @@ -257,9 +285,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan default: } - + if taManager.enabled { + <-taManager.taReadyCh + } level.Info(logger).Log("msg", "handling config file") - if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil { return errors.Wrapf(err, "error loading config from %q", cfg.configFile) } level.Info(logger).Log("msg", "finish handling config file") @@ -288,30 +318,11 @@ const ( savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config ) -func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) { - level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) - content, _ := os.ReadFile(filename) - text := string(content) - level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text) - - defer func() { - if err == nil { - configSuccess.Set(1) - configSuccessTime.SetToCurrentTime() - } else { - configSuccess.Set(0) - } - }() - - conf, err := config.LoadFile(filename, false, false, logger) - if err != nil { - return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename) - } - +func relabelScrapeConfigs(prometheusConfig *config.Config, logger log.Logger) { // For saving name before relabel // - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190 // - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193 - for _, sc := range conf.ScrapeConfigs { + for _, sc := range prometheusConfig.ScrapeConfigs { relabelConfigs := []*relabel.Config{ // job { @@ -331,13 +342,38 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config }, } - level.Info(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel") + level.Debug(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel") sc.RelabelConfigs = append(relabelConfigs, sc.RelabelConfigs...) sc.MetricRelabelConfigs = append(metricNameRelabelConfigs, sc.MetricRelabelConfigs...) - } +} +func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) { + level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) + content, _ := os.ReadFile(filename) + text := string(content) + level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text) + defer func() { + if err == nil { + configSuccess.Set(1) + configSuccessTime.SetToCurrentTime() + } else { + configSuccess.Set(0) + } + }() + // Check for TA + var conf *config.Config + if taManager.enabled { + level.Info(logger).Log("msg", "Target Allocator is enabled") + conf = (*config.Config)(taManager.config.PrometheusConfig) + } else { + conf, err = config.LoadFile(filename, false, false, logger) + if err != nil { + return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename) + } + } + relabelScrapeConfigs(conf, logger) failed := false for _, rl := range rls { if err := rl(conf); err != nil { diff --git a/plugins/inputs/prometheus/target_allocator.go b/plugins/inputs/prometheus/target_allocator.go new file mode 100644 index 0000000000..fc32931d27 --- /dev/null +++ b/plugins/inputs/prometheus/target_allocator.go @@ -0,0 +1,204 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package prometheus + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + otelpromreceiver "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" + tamanager "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + "github.com/prometheus/common/promlog" + promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/scrape" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "gopkg.in/yaml.v3" + + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" +) + +var DEFAULT_TLS_CA_FILE_PATH = filepath.Join("/etc", "amazon-cloudwatch-observability-agent-cert", "tls-ca.crt") + +const DEFAULT_TLS_RELOAD_INTERVAL_SECONDS = 10 * time.Second + +type TargetAllocatorManager struct { + enabled bool + host component.Host + shutdownCh chan struct{} + taReadyCh chan struct{} + reloadConfigHandler func(config *promconfig.Config) + manager *tamanager.Manager + config *otelpromreceiver.Config + sm *scrape.Manager + dm *discovery.Manager + logger log.Logger +} + +func isPodNameAvailable() bool { + podName := os.Getenv(envconfig.PodName) + if podName == "" { + return false + } + return true +} +func loadConfigFromFilename(filename string) (*otelpromreceiver.Config, error) { + yamlFile, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + var stringMap map[string]interface{} + err = yaml.Unmarshal(yamlFile, &stringMap) + if err != nil { + return nil, err + } + componentParser := confmap.NewFromStringMap(stringMap) + if componentParser == nil { + return nil, fmt.Errorf("unable to parse config from filename %s", filename) + } + var cfg otelpromreceiver.Config + err = componentParser.Unmarshal(&cfg) + if err != nil { + return nil, err + } + return &cfg, nil +} + +// Adapter from go-kit/log to zap.Logger +func createLogger(level *promlog.AllowedLevel) (*zap.Logger, error) { + zapLevel, err := zapcore.ParseLevel(level.String()) + if err != nil { + err = fmt.Errorf("Error parsing level: %v. Defaulting to info.", err) + zapLevel = zapcore.InfoLevel + } + // Create a base zap logger (you can customize it as needed) + zapCore := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), // Use JSON encoder for zap + zapcore.AddSync(os.Stdout), // Output to stdout + zapLevel, // Set log level to Debug + ) + // Create the zap logger + zapLogger := zap.New(zapCore) + return zapLogger, err +} + +func createTargetAllocatorManager(filename string, logger log.Logger, logLevel *promlog.AllowedLevel, sm *scrape.Manager, dm *discovery.Manager) *TargetAllocatorManager { + tam := TargetAllocatorManager{ + enabled: false, + manager: nil, + config: nil, + host: nil, + sm: sm, + dm: dm, + shutdownCh: make(chan struct{}, 1), + taReadyCh: make(chan struct{}, 1), + reloadConfigHandler: nil, + logger: logger, + } + err := tam.loadConfig(filename) + if err != nil { + level.Warn(logger).Log("msg", "Could not load config for target allocator from file", "filename", filename, "err", err) + return &tam + } + if tam.config == nil { + return &tam + } + tam.enabled = (tam.config.TargetAllocator != nil) && isPodNameAvailable() + if tam.enabled { + tam.loadManager(logLevel) + } + return &tam +} +func (tam *TargetAllocatorManager) loadManager(logLevel *promlog.AllowedLevel) { + logger, err := createLogger(logLevel) + if err != nil { + level.Error(tam.logger).Log("msg", "Error creating logger", "err", err) + } + receiverSettings := receiver.Settings{ + ID: component.MustNewID(strings.ReplaceAll(tam.config.TargetAllocator.CollectorID, "-", "_")), + TelemetrySettings: component.TelemetrySettings{ + Logger: logger, + TracerProvider: nil, + MeterProvider: nil, + MetricsLevel: 0, + Resource: pcommon.Resource{}, + ReportStatus: nil, + }, + } + + tam.manager = tamanager.NewManager(receiverSettings, tam.config.TargetAllocator, (*promconfig.Config)(tam.config.PrometheusConfig), false) +} +func (tam *TargetAllocatorManager) loadConfig(filename string) error { + config, err := loadConfigFromFilename(filename) + if err != nil { + return err + } + tam.config = config + if tam.config.TargetAllocator == nil { + return nil // no target allocator return + } + //has target allocator + tam.config.TargetAllocator.TLSSetting.CAFile = DEFAULT_TLS_CA_FILE_PATH + tam.config.TargetAllocator.TLSSetting.ReloadInterval = DEFAULT_TLS_RELOAD_INTERVAL_SECONDS + return nil +} +func (tam *TargetAllocatorManager) Run() error { + err := tam.manager.Start(context.Background(), tam.host, tam.sm, tam.dm) + if err != nil { + return err + } + err = tam.reloadConfigTicker() + if err != nil { + tam.manager.Shutdown() + return err + } + // go ahead and let dependencies know TA is ready + close(tam.taReadyCh) + //don't stop until shutdown + <-tam.shutdownCh + return nil +} +func (tam *TargetAllocatorManager) Shutdown() { + tam.manager.Shutdown() + close(tam.shutdownCh) +} +func (tam *TargetAllocatorManager) AttachReloadConfigHandler(handler func(config *promconfig.Config)) { + tam.reloadConfigHandler = handler +} +func (tam *TargetAllocatorManager) reloadConfigTicker() error { + + if tam.config.TargetAllocator == nil { + return level.Error(tam.logger).Log("msg", "target Allocator is not configured properly") + } + if tam.reloadConfigHandler == nil { + return level.Error(tam.logger).Log("msg", "target allocator reload config handler is not configured properly") + } + level.Info(tam.logger).Log("msg", "Starting Target Allocator Reload Config Ticker", "interval", tam.config.TargetAllocator.Interval.Seconds()) + ticker := time.NewTicker(tam.config.TargetAllocator.Interval) + go func() { + for { + select { + case <-ticker.C: + tam.reloadConfigHandler((*promconfig.Config)(tam.config.PrometheusConfig)) + case <-tam.shutdownCh: + ticker.Stop() + // Stop the ticker and exit when stop is signaled + level.Info(tam.logger).Log("msg", "Stopping Target Allocator Reload Config Ticker") + return + } + } + }() + return nil +} diff --git a/plugins/inputs/prometheus/testdata/base-k8.yaml b/plugins/inputs/prometheus/testdata/base-k8.yaml new file mode 100644 index 0000000000..a06f8d737e --- /dev/null +++ b/plugins/inputs/prometheus/testdata/base-k8.yaml @@ -0,0 +1,11 @@ +global: + scrape_interval: 1m + scrape_timeout: 10s +scrape_configs: + - job_name: kubernetes-service-endpoints + sample_limit: 10000 + kubernetes_sd_configs: + - role: pod + relabel_configs: + - action: labelmap + regex: __meta_kubernetes_service_label_(.+) \ No newline at end of file diff --git a/plugins/inputs/prometheus/testdata/target_allocator.yaml b/plugins/inputs/prometheus/testdata/target_allocator.yaml new file mode 100644 index 0000000000..724518eedc --- /dev/null +++ b/plugins/inputs/prometheus/testdata/target_allocator.yaml @@ -0,0 +1,7 @@ +config: + global: + scrape_interval: 15s + scrape_timeout: 10s +target_allocator: + endpoint: http://target-allocator-service:80 + interval: 30s \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/prometheus_ta_config.yaml b/translator/tocwconfig/sampleConfig/prometheus_ta_config.yaml new file mode 100644 index 0000000000..78a178075a --- /dev/null +++ b/translator/tocwconfig/sampleConfig/prometheus_ta_config.yaml @@ -0,0 +1,12 @@ +config: + global: + scrape_interval: 5m + scrape_timeout: 5s + scrape_configs: + - job_name: cwagent-ecs-file-sd-config + sample_limit: 10000 + file_sd_configs: + - files: [ '{ecsSdFileName}' ] +target_allocator: + endpoint: http://target-allocator-service:80 + interval: 30s diff --git a/translator/tocwconfig/tocwconfig_test.go b/translator/tocwconfig/tocwconfig_test.go index 6b88852bbb..cbed96ffb3 100644 --- a/translator/tocwconfig/tocwconfig_test.go +++ b/translator/tocwconfig/tocwconfig_test.go @@ -355,7 +355,30 @@ func TestPrometheusConfig(t *testing.T) { checkTranslation(t, "prometheus_config_linux", "linux", expectedEnvVars, "", tokenReplacements) checkTranslation(t, "prometheus_config_windows", "windows", nil, "", tokenReplacements) } +func TestPrometheusConfigwithTargetAllocator(t *testing.T) { + resetContext(t) + context.CurrentContext().SetRunInContainer(true) + context.CurrentContext().SetMode(config.ModeEC2) + t.Setenv(config.HOST_NAME, "host_name_from_env") + temp := t.TempDir() + prometheusConfigFileName := filepath.Join(temp, "prometheus_ta_config.yaml") + ecsSdFileName := filepath.Join(temp, "ecs_sd_results.yaml") + expectedEnvVars := map[string]string{} + tokenReplacements := map[string]string{ + prometheusFileNameToken: strings.ReplaceAll(prometheusConfigFileName, "\\", "\\\\"), + ecsSdFileNameToken: strings.ReplaceAll(ecsSdFileName, "\\", "\\\\"), + } + // Load prometheus config and replace ecs sd results file name token with temp file name + testPrometheusConfig := strings.ReplaceAll(prometheusConfig, "{"+ecsSdFileNameToken+"}", ecsSdFileName) + // Write the modified prometheus config to temp prometheus config file + err := os.WriteFile(prometheusConfigFileName, []byte(testPrometheusConfig), os.ModePerm) + require.NoError(t, err) + // In the following checks, we first load the json and replace tokens with the temp files + // Additionally, before comparing with actual, we again replace tokens with temp files in the expected toml & yaml + checkTranslation(t, "prometheus_config_linux", "linux", expectedEnvVars, "", tokenReplacements) + checkTranslation(t, "prometheus_config_windows", "windows", nil, "", tokenReplacements) +} func TestOtelPrometheusConfig(t *testing.T) { resetContext(t) context.CurrentContext().SetRunInContainer(true)