diff --git a/.chloggen/refactor-target-allocator.yaml b/.chloggen/refactor-target-allocator.yaml new file mode 100644 index 000000000000..ea632a87843a --- /dev/null +++ b/.chloggen/refactor-target-allocator.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Move the TargetAllocator configuration struct to an internal directory + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33146] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index c4269efde038..2d63c07bdb76 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -6,19 +6,17 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "errors" "fmt" - "net/url" "os" "sort" "strings" - "time" commonconfig "github.com/prometheus/common/config" promconfig "github.com/prometheus/prometheus/config" - promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/discovery/kubernetes" - "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" ) // Config defines configuration for Prometheus receiver. @@ -37,7 +35,7 @@ type Config struct { // ReportExtraScrapeMetrics - enables reporting of additional metrics for Prometheus client like scrape_body_size_bytes ReportExtraScrapeMetrics bool `mapstructure:"report_extra_scrape_metrics"` - TargetAllocator *TargetAllocator `mapstructure:"target_allocator"` + TargetAllocator *targetallocator.Config `mapstructure:"target_allocator"` } // Validate checks the receiver configuration is valid. @@ -48,32 +46,12 @@ func (cfg *Config) Validate() error { return nil } -type TargetAllocator struct { - confighttp.ClientConfig `mapstructure:",squash"` - Interval time.Duration `mapstructure:"interval"` - CollectorID string `mapstructure:"collector_id"` - HTTPSDConfig *PromHTTPSDConfig `mapstructure:"http_sd_config"` - HTTPScrapeConfig *PromHTTPClientConfig `mapstructure:"http_scrape_config"` -} - -func (cfg *TargetAllocator) Validate() error { - // ensure valid endpoint - if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil { - return fmt.Errorf("TargetAllocator endpoint is not valid: %s", cfg.Endpoint) - } - // ensure valid collectorID without variables - if cfg.CollectorID == "" || strings.Contains(cfg.CollectorID, "${") { - return fmt.Errorf("CollectorID is not a valid ID") - } - - return nil -} - // PromConfig is a redeclaration of promconfig.Config because we need custom unmarshaling // as prometheus "config" uses `yaml` tags. type PromConfig promconfig.Config var _ confmap.Unmarshaler = (*PromConfig)(nil) +var _ confmap.Marshaler = (*PromConfig)(nil) func (cfg *PromConfig) Unmarshal(componentParser *confmap.Conf) error { cfgMap := componentParser.ToStringMap() @@ -83,6 +61,20 @@ func (cfg *PromConfig) Unmarshal(componentParser *confmap.Conf) error { return unmarshalYAML(cfgMap, (*promconfig.Config)(cfg)) } +func (cfg PromConfig) Marshal(componentParser *confmap.Conf) error { + yamlOut, err := yaml.Marshal(cfg) + if err != nil { + return fmt.Errorf("prometheus receiver: failed to marshal config to yaml: %w", err) + } + + var result map[string]any + err = yaml.UnmarshalStrict(yamlOut, &result) + if err != nil { + return fmt.Errorf("prometheus receiver: failed to unmarshal yaml to prometheus config object: %w", err) + } + return componentParser.Merge(confmap.NewFromStringMap(result)) +} + func (cfg *PromConfig) Validate() error { // Reject features that Prometheus supports but that the receiver doesn't support: // See: @@ -126,43 +118,6 @@ func (cfg *PromConfig) Validate() error { return nil } -// PromHTTPSDConfig is a redeclaration of promHTTP.SDConfig because we need custom unmarshaling -// as prometheus "config" uses `yaml` tags. -type PromHTTPSDConfig promHTTP.SDConfig - -var _ confmap.Unmarshaler = (*PromHTTPSDConfig)(nil) - -func (cfg *PromHTTPSDConfig) Unmarshal(componentParser *confmap.Conf) error { - cfgMap := componentParser.ToStringMap() - if len(cfgMap) == 0 { - return nil - } - cfgMap["url"] = "http://placeholder" // we have to set it as else marshaling will fail - return unmarshalYAML(cfgMap, (*promHTTP.SDConfig)(cfg)) -} - -type PromHTTPClientConfig commonconfig.HTTPClientConfig - -var _ confmap.Unmarshaler = (*PromHTTPClientConfig)(nil) - -func (cfg *PromHTTPClientConfig) Unmarshal(componentParser *confmap.Conf) error { - cfgMap := componentParser.ToStringMap() - if len(cfgMap) == 0 { - return nil - } - return unmarshalYAML(cfgMap, (*commonconfig.HTTPClientConfig)(cfg)) -} - -func (cfg *PromHTTPClientConfig) Validate() error { - httpCfg := (*commonconfig.HTTPClientConfig)(cfg) - if err := validateHTTPClientConfig(httpCfg); err != nil { - return err - } - // Prometheus UnmarshalYaml implementation by default calls Validate, - // but it is safer to do it here as well. - return httpCfg.Validate() -} - func unmarshalYAML(in map[string]any, out any) error { yamlOut, err := yaml.Marshal(in) if err != nil { diff --git a/receiver/prometheusreceiver/config_test.go b/receiver/prometheusreceiver/config_test.go index 6ba48222ce92..06de1c74dfc4 100644 --- a/receiver/prometheusreceiver/config_test.go +++ b/receiver/prometheusreceiver/config_test.go @@ -104,6 +104,22 @@ func TestLoadTargetAllocatorConfig(t *testing.T) { assert.Equal(t, 1, len(r2.PrometheusConfig.ScrapeConfigs)) assert.Equal(t, "demo", r2.PrometheusConfig.ScrapeConfigs[0].JobName) assert.Equal(t, promModel.Duration(5*time.Second), r2.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval) + + sub, err = cm.Sub(component.NewIDWithName(metadata.Type, "k8Setup").String()) + require.NoError(t, err) + + t.Setenv("POD_NAME", "collector-1") + cfg = factory.CreateDefaultConfig() + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + + r3 := cfg.(*Config) + assert.Equal(t, "https://target-allocator-service:80", r3.TargetAllocator.Endpoint) + assert.Equal(t, 30*time.Second, r3.TargetAllocator.Interval) + assert.Equal(t, "collector-1", r3.TargetAllocator.CollectorID) + assert.Equal(t, promModel.Duration(15*time.Second), r3.PrometheusConfig.GlobalConfig.ScrapeInterval) + assert.Equal(t, promModel.Duration(10*time.Second), r3.PrometheusConfig.GlobalConfig.ScrapeTimeout) + } func TestLoadConfigFailsOnUnknownSection(t *testing.T) { @@ -341,34 +357,3 @@ func TestFileSDConfigWithoutSDFile(t *testing.T) { require.NoError(t, component.ValidateConfig(cfg)) } - -func TestPromHTTPClientConfigValidateAuthorization(t *testing.T) { - cfg := PromHTTPClientConfig{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.Authorization = &promConfig.Authorization{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.Authorization.CredentialsFile = "none" - require.Error(t, component.ValidateConfig(cfg)) - cfg.Authorization.CredentialsFile = filepath.Join("testdata", "dummy-tls-cert-file") - require.NoError(t, component.ValidateConfig(cfg)) -} - -func TestPromHTTPClientConfigValidateTLSConfig(t *testing.T) { - cfg := PromHTTPClientConfig{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.TLSConfig.CertFile = "none" - require.Error(t, component.ValidateConfig(cfg)) - cfg.TLSConfig.CertFile = filepath.Join("testdata", "dummy-tls-cert-file") - cfg.TLSConfig.KeyFile = "none" - require.Error(t, component.ValidateConfig(cfg)) - cfg.TLSConfig.KeyFile = filepath.Join("testdata", "dummy-tls-key-file") - require.NoError(t, component.ValidateConfig(cfg)) -} - -func TestPromHTTPClientConfigValidateMain(t *testing.T) { - cfg := PromHTTPClientConfig{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.BearerToken = "foo" - cfg.BearerTokenFile = filepath.Join("testdata", "dummy-tls-key-file") - require.Error(t, component.ValidateConfig(cfg)) -} diff --git a/receiver/prometheusreceiver/go.mod b/receiver/prometheusreceiver/go.mod index 9582657a88a2..a523f6adcda9 100644 --- a/receiver/prometheusreceiver/go.mod +++ b/receiver/prometheusreceiver/go.mod @@ -17,6 +17,8 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.103.0 go.opentelemetry.io/collector/config/confighttp v0.103.0 + go.opentelemetry.io/collector/config/configopaque v1.10.0 + go.opentelemetry.io/collector/config/configtls v0.103.0 go.opentelemetry.io/collector/confmap v0.103.0 go.opentelemetry.io/collector/confmap/provider/fileprovider v0.103.0 go.opentelemetry.io/collector/consumer v0.103.0 @@ -165,10 +167,8 @@ require ( go.opentelemetry.io/collector v0.103.0 // indirect go.opentelemetry.io/collector/config/configauth v0.103.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.10.0 // indirect - go.opentelemetry.io/collector/config/configopaque v1.10.0 // indirect go.opentelemetry.io/collector/config/configretry v0.103.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect - go.opentelemetry.io/collector/config/configtls v0.103.0 // indirect go.opentelemetry.io/collector/config/internal v0.103.0 // indirect go.opentelemetry.io/collector/confmap/converter/expandconverter v0.103.0 // indirect go.opentelemetry.io/collector/confmap/provider/envprovider v0.103.0 // indirect diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 2632610eecd9..458c7c0a85d2 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -4,14 +4,9 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" import ( - "bytes" "context" "errors" "fmt" - "io" - "net/http" - "net/url" - "os" "reflect" "regexp" "sync" @@ -19,21 +14,18 @@ import ( "unsafe" "github.com/go-kit/log" - "github.com/mitchellh/hashstructure/v2" "github.com/prometheus/client_golang/prometheus" commonconfig "github.com/prometheus/common/config" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" + promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" - promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/scrape" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" ) const ( @@ -43,33 +35,38 @@ const ( // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - cfg *Config - consumer consumer.Metrics - cancelFunc context.CancelFunc - targetAllocatorStop chan struct{} - configLoaded chan struct{} - loadConfigOnce sync.Once - - settings receiver.Settings - scrapeManager *scrape.Manager - discoveryManager *discovery.Manager - httpClient *http.Client - registerer prometheus.Registerer - unregisterMetrics func() - skipOffsetting bool // for testing only + cfg *Config + consumer consumer.Metrics + cancelFunc context.CancelFunc + configLoaded chan struct{} + loadConfigOnce sync.Once + + settings receiver.Settings + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager + targetAllocatorManager *targetallocator.Manager + registerer prometheus.Registerer + unregisterMetrics func() + skipOffsetting bool // for testing only } // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *pReceiver { + baseCfg := promconfig.Config(*cfg.PrometheusConfig) pr := &pReceiver{ - cfg: cfg, - consumer: next, - settings: set, - configLoaded: make(chan struct{}), - targetAllocatorStop: make(chan struct{}), + cfg: cfg, + consumer: next, + settings: set, + configLoaded: make(chan struct{}), registerer: prometheus.WrapRegistererWith( prometheus.Labels{"receiver": set.ID.String()}, prometheus.DefaultRegisterer), + targetAllocatorManager: targetallocator.NewManager( + set, + cfg.TargetAllocator, + &baseCfg, + enableNativeHistogramsGate.IsEnabled(), + ), } return pr } @@ -82,34 +79,17 @@ func (r *pReceiver) Start(ctx context.Context, host component.Host) error { logger := internal.NewZapToGokitLogAdapter(r.settings.Logger) - // add scrape configs defined by the collector configs - baseCfg := r.cfg.PrometheusConfig - err := r.initPrometheusComponents(discoveryCtx, logger) if err != nil { r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err)) return err } - err = r.applyCfg(baseCfg) + err = r.targetAllocatorManager.Start(ctx, host, r.scrapeManager, r.discoveryManager) if err != nil { - r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return err } - allocConf := r.cfg.TargetAllocator - if allocConf != nil { - r.httpClient, err = r.cfg.TargetAllocator.ToClient(ctx, host, r.settings.TelemetrySettings) - if err != nil { - r.settings.Logger.Error("Failed to create http client", zap.Error(err)) - return err - } - err = r.startTargetAllocator(allocConf, baseCfg) - if err != nil { - return err - } - } - r.loadConfigOnce.Do(func() { close(r.configLoaded) }) @@ -117,148 +97,6 @@ func (r *pReceiver) Start(ctx context.Context, host component.Host) error { return nil } -func (r *pReceiver) startTargetAllocator(allocConf *TargetAllocator, baseCfg *PromConfig) error { - r.settings.Logger.Info("Starting target allocator discovery") - // immediately sync jobs, not waiting for the first tick - savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) - if err != nil { - return err - } - go func() { - targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval) - for { - select { - case <-targetAllocatorIntervalTicker.C: - hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) - if newErr != nil { - r.settings.Logger.Error(newErr.Error()) - continue - } - savedHash = hash - case <-r.targetAllocatorStop: - targetAllocatorIntervalTicker.Stop() - r.settings.Logger.Info("Stopping target allocator") - return - } - } - }() - return nil -} - -// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. -// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. -func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *TargetAllocator, baseCfg *PromConfig) (uint64, error) { - r.settings.Logger.Debug("Syncing target allocator jobs") - scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint) - if err != nil { - r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) - return 0, err - } - - hash, err := hashstructure.Hash(scrapeConfigsResponse, hashstructure.FormatV2, nil) - if err != nil { - r.settings.Logger.Error("Failed to hash job list", zap.Error(err)) - return 0, err - } - if hash == compareHash { - // no update needed - return hash, nil - } - - // Clear out the current configurations - baseCfg.ScrapeConfigs = []*config.ScrapeConfig{} - - for jobName, scrapeConfig := range scrapeConfigsResponse { - var httpSD promHTTP.SDConfig - if allocConf.HTTPSDConfig == nil { - httpSD = promHTTP.SDConfig{ - RefreshInterval: model.Duration(30 * time.Second), - } - } else { - httpSD = promHTTP.SDConfig(*allocConf.HTTPSDConfig) - } - escapedJob := url.QueryEscape(jobName) - httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID) - httpSD.HTTPClientConfig.FollowRedirects = false - scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ - &httpSD, - } - - if allocConf.HTTPScrapeConfig != nil { - scrapeConfig.HTTPClientConfig = commonconfig.HTTPClientConfig(*allocConf.HTTPScrapeConfig) - } - - baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig) - } - - err = r.applyCfg(baseCfg) - if err != nil { - r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) - return 0, err - } - - return hash, nil -} - -// instantiateShard inserts the SHARD environment variable in the returned configuration -func (r *pReceiver) instantiateShard(body []byte) []byte { - shard, ok := os.LookupEnv("SHARD") - if !ok { - shard = "0" - } - return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) -} - -func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { - scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) - _, err := url.Parse(scrapeConfigsURL) // check if valid - if err != nil { - return nil, err - } - - resp, err := r.httpClient.Get(scrapeConfigsURL) - if err != nil { - return nil, err - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - jobToScrapeConfig := map[string]*config.ScrapeConfig{} - envReplacedBody := r.instantiateShard(body) - err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) - if err != nil { - return nil, err - } - err = resp.Body.Close() - if err != nil { - return nil, err - } - return jobToScrapeConfig, nil -} - -func (r *pReceiver) applyCfg(cfg *PromConfig) error { - if !enableNativeHistogramsGate.IsEnabled() { - // Enforce scraping classic histograms to avoid dropping them. - for _, scrapeConfig := range cfg.ScrapeConfigs { - scrapeConfig.ScrapeClassicHistograms = true - } - } - - if err := r.scrapeManager.ApplyConfig((*config.Config)(cfg)); err != nil { - return err - } - - discoveryCfg := make(map[string]discovery.Configs) - for _, scrapeConfig := range cfg.ScrapeConfigs { - discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs - r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) - } - return r.discoveryManager.ApplyConfig(discoveryCfg) -} - func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Logger) error { // Some SD mechanisms use the "refresh" package, which has its own metrics. refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer) @@ -374,7 +212,9 @@ func (r *pReceiver) Shutdown(context.Context) error { if r.scrapeManager != nil { r.scrapeManager.Stop() } - close(r.targetAllocatorStop) + if r.targetAllocatorManager != nil { + r.targetAllocatorManager.Shutdown() + } if r.unregisterMetrics != nil { r.unregisterMetrics() } diff --git a/receiver/prometheusreceiver/targetallocator/config.go b/receiver/prometheusreceiver/targetallocator/config.go new file mode 100644 index 000000000000..8b3c9ee0e82b --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/config.go @@ -0,0 +1,218 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + +import ( + "encoding/base64" + "errors" + "fmt" + "net/url" + "os" + "strings" + "time" + + commonconfig "github.com/prometheus/common/config" + promHTTP "github.com/prometheus/prometheus/discovery/http" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/confmap" + "gopkg.in/yaml.v2" +) + +type Config struct { + confighttp.ClientConfig `mapstructure:",squash"` + Interval time.Duration `mapstructure:"interval"` + CollectorID string `mapstructure:"collector_id"` + HTTPSDConfig *PromHTTPSDConfig `mapstructure:"http_sd_config"` + HTTPScrapeConfig *PromHTTPClientConfig `mapstructure:"http_scrape_config"` +} + +var _ confmap.Unmarshaler = (*Config)(nil) + +func getPodName() string { + return os.Getenv("POD_NAME") +} +func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { + err := componentParser.Unmarshal(cfg) + if err != nil { + return err + } + if collectorID := cfg.CollectorID; collectorID == "" { + podName := getPodName() + if podName == "" { + return errors.New("POD_NAME env var not found ") + } + cfg.CollectorID = podName + } + return nil +} + +// PromHTTPSDConfig is a redeclaration of promHTTP.SDConfig because we need custom unmarshaling +// as prometheus "config" uses `yaml` tags. +type PromHTTPSDConfig promHTTP.SDConfig + +func (cfg *Config) Validate() error { + // ensure valid endpoint + if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil { + return fmt.Errorf("TargetAllocator endpoint is not valid: %s", cfg.Endpoint) + } + // ensure valid collectorID without variables + if cfg.CollectorID == "" || strings.Contains(cfg.CollectorID, "${") { + return fmt.Errorf("CollectorID is not a valid ID") + } + + return nil +} + +var _ confmap.Unmarshaler = (*PromHTTPSDConfig)(nil) + +func (cfg *PromHTTPSDConfig) Unmarshal(componentParser *confmap.Conf) error { + cfgMap := componentParser.ToStringMap() + if len(cfgMap) == 0 { + return nil + } + cfgMap["url"] = "http://placeholder" // we have to set it as else marshaling will fail + return unmarshalYAML(cfgMap, (*promHTTP.SDConfig)(cfg)) +} + +type PromHTTPClientConfig commonconfig.HTTPClientConfig + +var _ confmap.Unmarshaler = (*PromHTTPClientConfig)(nil) + +func (cfg *PromHTTPClientConfig) Unmarshal(componentParser *confmap.Conf) error { + cfgMap := componentParser.ToStringMap() + if len(cfgMap) == 0 { + return nil + } + return unmarshalYAML(cfgMap, (*commonconfig.HTTPClientConfig)(cfg)) +} + +func (cfg *PromHTTPClientConfig) Validate() error { + httpCfg := (*commonconfig.HTTPClientConfig)(cfg) + if err := validateHTTPClientConfig(httpCfg); err != nil { + return err + } + // Prometheus UnmarshalYaml implementation by default calls Validate, + // but it is safer to do it here as well. + return httpCfg.Validate() +} + +func validateHTTPClientConfig(cfg *commonconfig.HTTPClientConfig) error { + if cfg.Authorization != nil { + if err := checkFile(cfg.Authorization.CredentialsFile); err != nil { + return fmt.Errorf("error checking authorization credentials file %q: %w", cfg.Authorization.CredentialsFile, err) + } + } + + if err := checkTLSConfig(cfg.TLSConfig); err != nil { + return err + } + return nil + +} + +func checkFile(fn string) error { + // Nothing set, nothing to error on. + if fn == "" { + return nil + } + _, err := os.Stat(fn) + return err +} + +func checkTLSConfig(tlsConfig commonconfig.TLSConfig) error { + if err := checkFile(tlsConfig.CertFile); err != nil { + return fmt.Errorf("error checking client cert file %q: %w", tlsConfig.CertFile, err) + } + if err := checkFile(tlsConfig.KeyFile); err != nil { + return fmt.Errorf("error checking client key file %q: %w", tlsConfig.KeyFile, err) + } + return nil +} + +func unmarshalYAML(in map[string]any, out any) error { + yamlOut, err := yaml.Marshal(in) + if err != nil { + return fmt.Errorf("prometheus receiver: failed to marshal config to yaml: %w", err) + } + + err = yaml.UnmarshalStrict(yamlOut, out) + if err != nil { + return fmt.Errorf("prometheus receiver: failed to unmarshal yaml to prometheus config object: %w", err) + } + return nil +} + +// convertTLSVersion converts a string TLS version to the corresponding config.TLSVersion value in prometheus common. +func convertTLSVersion(version string) (commonconfig.TLSVersion, error) { + normalizedVersion := "TLS" + strings.ReplaceAll(version, ".", "") + + if tlsVersion, exists := commonconfig.TLSVersions[normalizedVersion]; exists { + return tlsVersion, nil + } + return 0, fmt.Errorf("unsupported TLS version: %s", version) +} + +// configureSDHTTPClientConfigFromTA configures the http client for the service discovery manager +// based on the provided TargetAllocator configuration. +func configureSDHTTPClientConfigFromTA(httpSD *promHTTP.SDConfig, allocConf *Config) error { + httpSD.HTTPClientConfig.FollowRedirects = false + + httpSD.HTTPClientConfig.TLSConfig = commonconfig.TLSConfig{ + InsecureSkipVerify: allocConf.TLSSetting.InsecureSkipVerify, + ServerName: allocConf.TLSSetting.ServerName, + CAFile: allocConf.TLSSetting.CAFile, + CertFile: allocConf.TLSSetting.CertFile, + KeyFile: allocConf.TLSSetting.KeyFile, + } + + if allocConf.TLSSetting.CAPem != "" { + decodedCA, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.CAPem)) + if err != nil { + return fmt.Errorf("failed to decode CA: %w", err) + } + httpSD.HTTPClientConfig.TLSConfig.CA = string(decodedCA) + } + + if allocConf.TLSSetting.CertPem != "" { + decodedCert, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.CertPem)) + if err != nil { + return fmt.Errorf("failed to decode Cert: %w", err) + } + httpSD.HTTPClientConfig.TLSConfig.Cert = string(decodedCert) + } + + if allocConf.TLSSetting.KeyPem != "" { + decodedKey, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.KeyPem)) + if err != nil { + return fmt.Errorf("failed to decode Key: %w", err) + } + httpSD.HTTPClientConfig.TLSConfig.Key = commonconfig.Secret(decodedKey) + } + + if allocConf.TLSSetting.MinVersion != "" { + minVersion, err := convertTLSVersion(allocConf.TLSSetting.MinVersion) + if err != nil { + return err + } + httpSD.HTTPClientConfig.TLSConfig.MinVersion = minVersion + } + + if allocConf.TLSSetting.MaxVersion != "" { + maxVersion, err := convertTLSVersion(allocConf.TLSSetting.MaxVersion) + if err != nil { + return err + } + httpSD.HTTPClientConfig.TLSConfig.MaxVersion = maxVersion + } + + if allocConf.ProxyURL != "" { + proxyURL, err := url.Parse(allocConf.ProxyURL) + if err != nil { + return err + } + httpSD.HTTPClientConfig.ProxyURL = commonconfig.URL{URL: proxyURL} + } + + return nil +} diff --git a/receiver/prometheusreceiver/targetallocator/config_test.go b/receiver/prometheusreceiver/targetallocator/config_test.go new file mode 100644 index 000000000000..78ec44017f53 --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/config_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + +import ( + "path/filepath" + "testing" + "time" + + promConfig "github.com/prometheus/common/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(Config{})) +} + +func TestLoadTargetAllocatorConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + cfg := &Config{} + + sub, err := cm.Sub("target_allocator") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + + assert.Equal(t, "http://localhost:8080", cfg.ClientConfig.Endpoint) + assert.Equal(t, 5*time.Second, cfg.ClientConfig.Timeout) + assert.Equal(t, "client.crt", cfg.ClientConfig.TLSSetting.CertFile) + assert.Equal(t, 30*time.Second, cfg.Interval) + assert.Equal(t, "collector-1", cfg.CollectorID) +} +func TestLoadTargetAllocatorK8Config(t *testing.T) { + t.Setenv("POD_NAME", "collector-1") + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "k8-config.yaml")) + require.NoError(t, err) + cfg := &Config{} + + sub, err := cm.Sub("target_allocator") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + + assert.Equal(t, "http://target-allocator-service:80", cfg.ClientConfig.Endpoint) + assert.Equal(t, "collector-1", cfg.CollectorID) +} + +func TestPromHTTPClientConfigValidateAuthorization(t *testing.T) { + cfg := PromHTTPClientConfig{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.Authorization = &promConfig.Authorization{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.Authorization.CredentialsFile = "none" + require.Error(t, component.ValidateConfig(cfg)) + cfg.Authorization.CredentialsFile = filepath.Join("testdata", "dummy-tls-cert-file") + require.NoError(t, component.ValidateConfig(cfg)) +} + +func TestPromHTTPClientConfigValidateTLSConfig(t *testing.T) { + cfg := PromHTTPClientConfig{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.TLSConfig.CertFile = "none" + require.Error(t, component.ValidateConfig(cfg)) + cfg.TLSConfig.CertFile = filepath.Join("testdata", "dummy-tls-cert-file") + cfg.TLSConfig.KeyFile = "none" + require.Error(t, component.ValidateConfig(cfg)) + cfg.TLSConfig.KeyFile = filepath.Join("testdata", "dummy-tls-key-file") + require.NoError(t, component.ValidateConfig(cfg)) +} + +func TestPromHTTPClientConfigValidateMain(t *testing.T) { + cfg := PromHTTPClientConfig{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.BearerToken = "foo" + cfg.BearerTokenFile = filepath.Join("testdata", "dummy-tls-key-file") + require.Error(t, component.ValidateConfig(cfg)) +} diff --git a/receiver/prometheusreceiver/targetallocator/manager.go b/receiver/prometheusreceiver/targetallocator/manager.go new file mode 100644 index 000000000000..5fe731c9efe4 --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/manager.go @@ -0,0 +1,247 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + +import ( + "bytes" + "context" + "fmt" + "hash/fnv" + "io" + "net/http" + "net/url" + "os" + "sort" + "time" + + commonconfig "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + promHTTP "github.com/prometheus/prometheus/discovery/http" + "github.com/prometheus/prometheus/scrape" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "gopkg.in/yaml.v2" +) + +type Manager struct { + settings receiver.Settings + shutdown chan struct{} + cfg *Config + promCfg *promconfig.Config + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager + enableNativeHistograms bool +} + +func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager { + return &Manager{ + shutdown: make(chan struct{}), + settings: set, + cfg: cfg, + promCfg: promCfg, + enableNativeHistograms: enableNativeHistograms, + } +} + +func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager) error { + m.scrapeManager = sm + m.discoveryManager = dm + err := m.applyCfg() + if err != nil { + m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) + return err + } + if m.cfg == nil { + // the target allocator is disabled + return nil + } + httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings) + if err != nil { + m.settings.Logger.Error("Failed to create http client", zap.Error(err)) + return err + } + m.settings.Logger.Info("Starting target allocator discovery") + // immediately sync jobs, not waiting for the first tick + savedHash, err := m.sync(uint64(0), httpClient) + if err != nil { + m.settings.Logger.Error("Failed to sync target allocator", zap.Error(err)) + } + go func() { + targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval) + for { + select { + case <-targetAllocatorIntervalTicker.C: + hash, newErr := m.sync(savedHash, httpClient) + if newErr != nil { + m.settings.Logger.Error(newErr.Error()) + continue + } + savedHash = hash + case <-m.shutdown: + targetAllocatorIntervalTicker.Stop() + m.settings.Logger.Info("Stopping target allocator") + return + } + } + }() + return nil +} + +func (m *Manager) Shutdown() { + close(m.shutdown) +} + +// sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. +// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. +func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) { + m.settings.Logger.Debug("Syncing target allocator jobs") + m.settings.Logger.Debug("endpoint", zap.String("endpoint", m.cfg.Endpoint)) + + scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint) + if err != nil { + m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) + return 0, err + } + + m.settings.Logger.Debug("Scrape configs response", zap.Reflect("scrape_configs", scrapeConfigsResponse)) + hash, err := getScrapeConfigHash(scrapeConfigsResponse) + if err != nil { + m.settings.Logger.Error("Failed to hash job list", zap.Error(err)) + return 0, err + } + if hash == compareHash { + // no update needed + return hash, nil + } + + // Clear out the current configurations + m.promCfg.ScrapeConfigs = []*promconfig.ScrapeConfig{} + + for jobName, scrapeConfig := range scrapeConfigsResponse { + var httpSD promHTTP.SDConfig + if m.cfg.HTTPSDConfig == nil { + httpSD = promHTTP.SDConfig{ + RefreshInterval: model.Duration(30 * time.Second), + } + } else { + httpSD = promHTTP.SDConfig(*m.cfg.HTTPSDConfig) + } + escapedJob := url.QueryEscape(jobName) + httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", m.cfg.Endpoint, escapedJob, m.cfg.CollectorID) + + err = configureSDHTTPClientConfigFromTA(&httpSD, m.cfg) + if err != nil { + m.settings.Logger.Error("Failed to configure http client config", zap.Error(err)) + return 0, err + } + + httpSD.HTTPClientConfig.FollowRedirects = false + scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ + &httpSD, + } + + if m.cfg.HTTPScrapeConfig != nil { + scrapeConfig.HTTPClientConfig = commonconfig.HTTPClientConfig(*m.cfg.HTTPScrapeConfig) + } + + m.promCfg.ScrapeConfigs = append(m.promCfg.ScrapeConfigs, scrapeConfig) + } + + err = m.applyCfg() + if err != nil { + m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) + return 0, err + } + + return hash, nil +} + +func (m *Manager) applyCfg() error { + if !m.enableNativeHistograms { + // Enforce scraping classic histograms to avoid dropping them. + for _, scrapeConfig := range m.promCfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + } + + if err := m.scrapeManager.ApplyConfig(m.promCfg); err != nil { + return err + } + + discoveryCfg := make(map[string]discovery.Configs) + for _, scrapeConfig := range m.promCfg.ScrapeConfigs { + discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + m.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) + } + return m.discoveryManager.ApplyConfig(discoveryCfg) +} + +func getScrapeConfigsResponse(httpClient *http.Client, baseURL string) (map[string]*promconfig.ScrapeConfig, error) { + scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) + _, err := url.Parse(scrapeConfigsURL) // check if valid + if err != nil { + return nil, err + } + + resp, err := httpClient.Get(scrapeConfigsURL) + if err != nil { + return nil, err + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + jobToScrapeConfig := map[string]*promconfig.ScrapeConfig{} + envReplacedBody := instantiateShard(body) + err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) + if err != nil { + return nil, err + } + err = resp.Body.Close() + if err != nil { + return nil, err + } + return jobToScrapeConfig, nil +} + +// instantiateShard inserts the SHARD environment variable in the returned configuration +func instantiateShard(body []byte) []byte { + shard, ok := os.LookupEnv("SHARD") + if !ok { + shard = "0" + } + return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) +} + +// Calculate a hash for a scrape config map. +// This is done by marshaling to YAML because it's the most straightforward and doesn't run into problems with unexported fields. +func getScrapeConfigHash(jobToScrapeConfig map[string]*promconfig.ScrapeConfig) (uint64, error) { + var err error + hash := fnv.New64() + yamlEncoder := yaml.NewEncoder(hash) + + jobKeys := make([]string, 0, len(jobToScrapeConfig)) + for jobName := range jobToScrapeConfig { + jobKeys = append(jobKeys, jobName) + } + sort.Strings(jobKeys) + + for _, jobName := range jobKeys { + _, err = hash.Write([]byte(jobName)) + if err != nil { + return 0, err + } + err = yamlEncoder.Encode(jobToScrapeConfig[jobName]) + if err != nil { + return 0, err + } + } + yamlEncoder.Close() + return hash.Sum64(), err +} diff --git a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go b/receiver/prometheusreceiver/targetallocator/manager_test.go similarity index 56% rename from receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go rename to receiver/prometheusreceiver/targetallocator/manager_test.go index 69e3c1a7b061..7747f71b27ad 100644 --- a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go +++ b/receiver/prometheusreceiver/targetallocator/manager_test.go @@ -3,26 +3,35 @@ //go:build !race -package prometheusreceiver +package targetallocator import ( "context" + "encoding/base64" "encoding/json" "net/http" "net/http/httptest" + "net/url" "strings" "sync" "sync/atomic" "testing" "time" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" + "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/scrape" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -50,9 +59,15 @@ type hTTPSDResponse struct { Labels map[model.LabelName]model.LabelValue `json:"labels"` } +type expectedMetricRelabelConfigTestResult struct { + JobName string + MetricRelabelRegex relabel.Regexp +} + type expectedTestResultJobMap struct { - Targets []string - Labels model.LabelSet + Targets []string + Labels model.LabelSet + MetricRelabelConfig *expectedMetricRelabelConfigTestResult } type expectedTestResult struct { @@ -150,6 +165,109 @@ type Responses struct { responses map[string][]mockTargetAllocatorResponseRaw } +func TestGetScrapeConfigHash(t *testing.T) { + jobToScrapeConfig1 := map[string]*promconfig.ScrapeConfig{} + jobToScrapeConfig1["job1"] = &promconfig.ScrapeConfig{ + JobName: "job1", + HonorTimestamps: true, + ScrapeInterval: model.Duration(30 * time.Second), + ScrapeTimeout: model.Duration(30 * time.Second), + MetricsPath: "/metrics", + Scheme: "http", + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "d", + Action: relabel.KeepEqual, + }, + }, + } + jobToScrapeConfig1["job2"] = &promconfig.ScrapeConfig{ + JobName: "job2", + HonorTimestamps: true, + ScrapeInterval: model.Duration(30 * time.Second), + ScrapeTimeout: model.Duration(30 * time.Second), + MetricsPath: "/metrics", + Scheme: "http", + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "d", + Action: relabel.KeepEqual, + }, + }, + } + jobToScrapeConfig1["job3"] = &promconfig.ScrapeConfig{ + JobName: "job3", + HonorTimestamps: true, + ScrapeInterval: model.Duration(30 * time.Second), + ScrapeTimeout: model.Duration(30 * time.Second), + MetricsPath: "/metrics", + Scheme: "http", + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "d", + Action: relabel.KeepEqual, + }, + }, + } + jobToScrapeConfig2 := map[string]*promconfig.ScrapeConfig{} + jobToScrapeConfig2["job2"] = &promconfig.ScrapeConfig{ + JobName: "job2", + HonorTimestamps: true, + ScrapeInterval: model.Duration(30 * time.Second), + ScrapeTimeout: model.Duration(30 * time.Second), + MetricsPath: "/metrics", + Scheme: "http", + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "d", + Action: relabel.KeepEqual, + }, + }, + } + jobToScrapeConfig2["job1"] = &promconfig.ScrapeConfig{ + JobName: "job1", + HonorTimestamps: true, + ScrapeInterval: model.Duration(30 * time.Second), + ScrapeTimeout: model.Duration(30 * time.Second), + MetricsPath: "/metrics", + Scheme: "http", + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "d", + Action: relabel.KeepEqual, + }, + }, + } + jobToScrapeConfig2["job3"] = &promconfig.ScrapeConfig{ + JobName: "job3", + HonorTimestamps: true, + ScrapeInterval: model.Duration(30 * time.Second), + ScrapeTimeout: model.Duration(30 * time.Second), + MetricsPath: "/metrics", + Scheme: "http", + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "d", + Action: relabel.KeepEqual, + }, + }, + } + + hash1, err := getScrapeConfigHash(jobToScrapeConfig1) + require.NoError(t, err) + + hash2, err := getScrapeConfigHash(jobToScrapeConfig2) + require.NoError(t, err) + + assert.Equal(t, hash1, hash2) +} + func TestTargetAllocatorJobRetrieval(t *testing.T) { for _, tc := range []struct { desc string @@ -167,6 +285,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job1", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -176,6 +295,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job2", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -218,19 +338,16 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{ - BasicAuth: &commonconfig.BasicAuth{ - Username: "user", - Password: "aPassword", - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{ + BasicAuth: &commonconfig.BasicAuth{ + Username: "user", + Password: "aPassword", }, - RefreshInterval: model.Duration(60 * time.Second), }, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -261,6 +378,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job1", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -270,6 +388,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job2", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -312,14 +431,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -353,6 +469,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job1", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -362,6 +479,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job2", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -373,6 +491,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job1", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -382,6 +501,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { "job_name": "job3", "scrape_interval": "30s", "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, "metrics_path": "/metrics", "scheme": "http", "relabel_configs": nil, @@ -424,14 +544,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -466,14 +583,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 50 * time.Millisecond, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 50 * time.Millisecond, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -481,10 +595,95 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { jobMap: map[string]expectedTestResultJobMap{}, }, }, + { + desc: "update metric relabel config regex", + responses: Responses{ + releaserMap: map[string]int{ + "/scrape_configs": 1, + }, + responses: map[string][]mockTargetAllocatorResponseRaw{ + "/scrape_configs": { + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]any{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, + "metrics_path": "/metrics", + "scheme": "http", + "metric_relabel_configs": []map[string]string{ + { + "separator": ";", + "regex": "regex1", + "action": "keep", + }, + }, + }, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: map[string]map[string]any{ + "job1": { + "job_name": "job1", + "scrape_interval": "30s", + "scrape_timeout": "30s", + "scrape_protocols": []string{"OpenMetricsText1.0.0", "OpenMetricsText0.0.1", "PrometheusText0.0.4"}, + "metrics_path": "/metrics", + "scheme": "http", + "metric_relabel_configs": []map[string]string{ + { + "separator": ";", + "regex": "regex2", + "action": "keep", + }, + }, + }, + }}, + }, + "/jobs/job1/targets": { + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + mockTargetAllocatorResponseRaw{code: 200, data: []hTTPSDResponse{ + {Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }}, + }}, + }, + }, + }, + cfg: &Config{ + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), + }, + }, + want: expectedTestResult{ + empty: false, + jobMap: map[string]expectedTestResultJobMap{ + "job1": { + Targets: []string{"localhost:9090"}, + Labels: map[model.LabelName]model.LabelValue{ + "__meta_datacenter": "london", + "__meta_prometheus_job": "node", + }, + MetricRelabelConfig: &expectedMetricRelabelConfigTestResult{ + JobName: "job1", + MetricRelabelRegex: relabel.MustNewRegexp("regex2"), + }, + }, + }, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { ctx := context.Background() - cms := new(consumertest.MetricsSink) allocator, err := setupMockTargetAllocator(tc.responses) require.NoError(t, err, "Failed to create allocator", tc.responses) @@ -492,17 +691,19 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { allocator.Start() defer allocator.Stop() - tc.cfg.TargetAllocator.Endpoint = allocator.srv.URL // set service URL with the automatic generated one - receiver := newPrometheusReceiver(receivertest.NewNopSettings(), tc.cfg, cms) + tc.cfg.Endpoint = allocator.srv.URL // set service URL with the automatic generated one + scrapeManager, discoveryManager := initPrometheusManagers(ctx, t) - require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost())) + baseCfg := promconfig.Config{GlobalConfig: promconfig.DefaultGlobalConfig} + manager := NewManager(receivertest.NewNopSettings(), tc.cfg, &baseCfg, false) + require.NoError(t, manager.Start(ctx, componenttest.NewNopHost(), scrapeManager, discoveryManager)) allocator.wg.Wait() - providers := receiver.discoveryManager.Providers() + providers := discoveryManager.Providers() if tc.want.empty { // if no base config is supplied and the job retrieval fails then no configuration should be found - require.Len(t, providers, 0) + require.Empty(t, providers) return } @@ -516,7 +717,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { // are http configs applied? sdConfig := provider.Config().(*promHTTP.SDConfig) - require.Equal(t, tc.cfg.TargetAllocator.HTTPSDConfig.HTTPClientConfig, sdConfig.HTTPClientConfig) + require.Equal(t, tc.cfg.HTTPSDConfig.HTTPClientConfig, sdConfig.HTTPClientConfig) for _, group := range refresh { found := false @@ -532,6 +733,15 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { // which is identical to the source url s.Labels["__meta_url"] = model.LabelValue(sdConfig.URL) require.Equal(t, s.Labels, group.Labels) + if s.MetricRelabelConfig != nil { + for _, sc := range manager.promCfg.ScrapeConfigs { + if sc.JobName == s.MetricRelabelConfig.JobName { + for _, mc := range sc.MetricRelabelConfigs { + require.Equal(t, s.MetricRelabelConfig.MetricRelabelRegex, mc.Regex) + } + } + } + } found = true } require.True(t, found, "Returned job is not defined in expected values", group) @@ -540,3 +750,64 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }) } } + +func TestConfigureSDHTTPClientConfigFromTA(t *testing.T) { + ta := &Config{} + ta.TLSSetting = configtls.ClientConfig{ + InsecureSkipVerify: true, + ServerName: "test.server", + Config: configtls.Config{ + CAFile: "/path/to/ca", + CertFile: "/path/to/cert", + KeyFile: "/path/to/key", + CAPem: configopaque.String(base64.StdEncoding.EncodeToString([]byte("test-ca"))), + CertPem: configopaque.String(base64.StdEncoding.EncodeToString([]byte("test-cert"))), + KeyPem: configopaque.String(base64.StdEncoding.EncodeToString([]byte("test-key"))), + MinVersion: "1.2", + MaxVersion: "1.3", + }, + } + ta.ProxyURL = "http://proxy.test" + + httpSD := &promHTTP.SDConfig{RefreshInterval: model.Duration(30 * time.Second)} + + err := configureSDHTTPClientConfigFromTA(httpSD, ta) + + assert.NoError(t, err) + + assert.False(t, httpSD.HTTPClientConfig.FollowRedirects) + assert.True(t, httpSD.HTTPClientConfig.TLSConfig.InsecureSkipVerify) + assert.Equal(t, "test.server", httpSD.HTTPClientConfig.TLSConfig.ServerName) + assert.Equal(t, "/path/to/ca", httpSD.HTTPClientConfig.TLSConfig.CAFile) + assert.Equal(t, "/path/to/cert", httpSD.HTTPClientConfig.TLSConfig.CertFile) + assert.Equal(t, "/path/to/key", httpSD.HTTPClientConfig.TLSConfig.KeyFile) + assert.Equal(t, "test-ca", httpSD.HTTPClientConfig.TLSConfig.CA) + assert.Equal(t, "test-cert", httpSD.HTTPClientConfig.TLSConfig.Cert) + assert.Equal(t, commonconfig.Secret("test-key"), httpSD.HTTPClientConfig.TLSConfig.Key) + assert.Equal(t, commonconfig.TLSVersions["TLS12"], httpSD.HTTPClientConfig.TLSConfig.MinVersion) + assert.Equal(t, commonconfig.TLSVersions["TLS13"], httpSD.HTTPClientConfig.TLSConfig.MaxVersion) + + parsedProxyURL, _ := url.Parse("http://proxy.test") + assert.Equal(t, commonconfig.URL{URL: parsedProxyURL}, httpSD.HTTPClientConfig.ProxyURL) + + // Test case with empty TargetAllocator + emptyTA := &Config{} + emptyHTTPSD := &promHTTP.SDConfig{RefreshInterval: model.Duration(30 * time.Second)} + + err = configureSDHTTPClientConfigFromTA(emptyHTTPSD, emptyTA) + + assert.NoError(t, err) +} + +func initPrometheusManagers(ctx context.Context, t *testing.T) (*scrape.Manager, *discovery.Manager) { + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) + require.NoError(t, err) + discoveryManager := discovery.NewManager(ctx, logger, reg, sdMetrics) + require.NotNil(t, discoveryManager) + + scrapeManager, err := scrape.NewManager(&scrape.Options{}, logger, nil, reg) + require.NoError(t, err) + return scrapeManager, discoveryManager +} diff --git a/receiver/prometheusreceiver/targetallocator/testdata/config.yaml b/receiver/prometheusreceiver/targetallocator/testdata/config.yaml new file mode 100644 index 000000000000..2f243000155d --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/testdata/config.yaml @@ -0,0 +1,7 @@ +target_allocator: + endpoint: http://localhost:8080 + timeout: 5s + tls: + cert_file: "client.crt" + interval: 30s + collector_id: collector-1 diff --git a/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-cert-file b/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-cert-file new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-key-file b/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-key-file new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/receiver/prometheusreceiver/targetallocator/testdata/k8-config.yaml b/receiver/prometheusreceiver/targetallocator/testdata/k8-config.yaml new file mode 100644 index 000000000000..8554f98bfcad --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/testdata/k8-config.yaml @@ -0,0 +1,4 @@ +target_allocator: + endpoint: http://target-allocator-service:80 + interval: 30s + \ No newline at end of file diff --git a/receiver/prometheusreceiver/testdata/config_k8s.yaml b/receiver/prometheusreceiver/testdata/config_k8s.yaml index cdbe0773d5ea..8183aa128ee8 100644 --- a/receiver/prometheusreceiver/testdata/config_k8s.yaml +++ b/receiver/prometheusreceiver/testdata/config_k8s.yaml @@ -1,28 +1,28 @@ prometheus: config: scrape_configs: - - job_name: apps - kubernetes_sd_configs: - - role: pod - selectors: - - role: pod - # only scrape data from pods running on the same node as collector - field: "spec.nodeName=${NODE_NAME}" - relabel_configs: - # scrape pods annotated with "prometheus.io/scrape: true" - - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape] - regex: "true" - action: keep - # read the port from "prometheus.io/port: " annotation and update scraping address accordingly - - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port] - action: replace - target_label: __address__ - regex: ([^:]+)(?::\d+)?;(\d+) - # escaped $1:$2 - replacement: $$1:$$2 - - source_labels: [__meta_kubernetes_namespace] - action: replace - target_label: kubernetes_namespace - - source_labels: [__meta_kubernetes_pod_name] - action: replace - target_label: kubernetes_pod_name + - job_name: apps + kubernetes_sd_configs: + - role: pod + selectors: + - role: pod + # only scrape data from pods running on the same node as collector + field: "spec.nodeName=${NODE_NAME}" + relabel_configs: + # scrape pods annotated with "prometheus.io/scrape: true" + - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape] + regex: "true" + action: keep + # read the port from "prometheus.io/port: " annotation and update scraping address accordingly + - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port] + action: replace + target_label: __address__ + regex: ([^:]+)(?::\d+)?;(\d+) + # escaped $1:$2 + replacement: $$1:$$2 + - source_labels: [__meta_kubernetes_namespace] + action: replace + target_label: kubernetes_namespace + - source_labels: [__meta_kubernetes_pod_name] + action: replace + target_label: kubernetes_pod_name \ No newline at end of file diff --git a/receiver/prometheusreceiver/testdata/config_target_allocator.yaml b/receiver/prometheusreceiver/testdata/config_target_allocator.yaml index 9c708ed58de5..cd3aac8f34fd 100644 --- a/receiver/prometheusreceiver/testdata/config_target_allocator.yaml +++ b/receiver/prometheusreceiver/testdata/config_target_allocator.yaml @@ -20,3 +20,11 @@ prometheus/withOnlyScrape: scrape_configs: - job_name: 'demo' scrape_interval: 5s +prometheus/k8Setup: + config: + global: + scrape_interval: 15s + scrape_timeout: 10s + target_allocator: + endpoint: https://target-allocator-service:80 + interval: 30s \ No newline at end of file