From c686e6df0e3309faf0ac894f5f53c5a209fd88be Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Mon, 27 Nov 2023 21:30:55 +0000 Subject: [PATCH] HTTP service discovery (#16) * Extract fileSD function * Add option for http service discovery * Update docs * Fix code block in readme * Clarify scrape interval parameter --------- Co-authored-by: Josh Mills --- README.md | 29 +++++++++++++-- main.go | 103 ++++++++++++++++++++++++++++++++++++------------------ 2 files changed, 96 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index fad3f78..3d46e5e 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Service discovery for [AWS MSK](https://aws.amazon.com/msk/), compatible with [P ## How it works -This service gets a list of MSK clusters in an AWS account and exports each broker to a Prometheus-compatible static config to be used with the [`file_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) mechanism. +This service gets a list of MSK clusters in an AWS account and exports each broker to a Prometheus-compatible static config to be used with the [`file_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) or [`http_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config) mechanism. ## Pre-requisites @@ -44,12 +44,20 @@ When using AWS credentials or IAM Roles, the following policy needs to be attach Usage of ./prometheus-msk-discovery: -filter string a regex pattern to filter cluster names from the results + -http-sd + expose http_sd interface rather than writing a file -job-prefix string string with which to prefix each job label (default "msk") + -listen-address string + Address to listen on for http service discovery (default ":8080") -output string path of the file to write MSK discovery information to (default "msk_file_sd.yml") + -region string + the aws region in which to scan for MSK clusters -scrape-interval duration - interval at which to scrape the AWS API for MSK cluster information (default 5m0s) + interval at which to scrape the AWS API for MSK cluster information when in file_sd mode (default 5m0s) + -tag value + A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression. ``` ### Example output: @@ -61,6 +69,23 @@ $ ./prometheus-msk-discovery -scrape-interval 10s -filter 'primary' An example output file can be found [here](examples/msk_file_sd.yml) +### http_sd + +``` +$ ./prometheus-msk-discovery -http-sd -listen-address :8989 -filter 'primary' +``` + +``` +$ curl localhost:8989 +[{"targets":["b-1.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11001","b-1.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11002","b-2.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11001","b-2.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11002"],"labels":{"job":"msk-primary-kafka","cluster_name":"primary-kafka","cluster_arn":"arn:aws:kafka:eu-west-2:111111111111:cluster/primary-kafka/522d90ab-d400-4ea0-b8fd-bbf3576425d4-2"}}] +``` + +```yaml +http_sd_configs: + - url: http://localhost:8989 + refresh_interval: 30s +``` + ## Region Precedence When no region is specified with the `-region` flag the process first attempts to load the default SDK configuration checking for an `AWS_REGION` environment variable or reading any region specified in the standard [configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). If no region is found it will attempt to retrieve it from the EC2 Instance Metadata Service. diff --git a/main.go b/main.go index cc830d6..0ae0b24 100644 --- a/main.go +++ b/main.go @@ -2,10 +2,12 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "io/ioutil" "log" + "net/http" "regexp" "strings" "time" @@ -24,11 +26,13 @@ const ( type tags map[string]string var ( - outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to") - interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information") - jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label") - clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results") - awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters") + outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to") + interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information when in file_sd mode") + jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label") + clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results") + awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters") + httpSDEnabled = flag.Bool("http-sd", false, "expose http_sd interface rather than writing a file") + listenAddress = flag.String("listen-address", ":8080", "Address to listen on for http service discovery") ) type kafkaClient interface { @@ -37,16 +41,16 @@ type kafkaClient interface { } type labels struct { - Job string `yaml:"job"` - ClusterName string `yaml:"cluster_name"` - ClusterArn string `yaml:"cluster_arn"` + Job string `yaml:"job" json:"job"` + ClusterName string `yaml:"cluster_name" json:"cluster_name"` + ClusterArn string `yaml:"cluster_arn" json:"cluster_arn"` } // PrometheusStaticConfig is the final structure of a single static config that -// will be outputted to the Prometheus file service discovery config +// will be outputted to the Prometheus file/http service discovery config type PrometheusStaticConfig struct { - Targets []string `yaml:"targets"` - Labels labels `yaml:"labels"` + Targets []string `yaml:"targets" json:"targets"` + Labels labels `yaml:"labels" json:"labels"` } // clusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled @@ -205,30 +209,8 @@ func GetStaticConfigs(svc kafkaClient, opt_filter ...Filter) ([]PrometheusStatic return staticConfigs, nil } -func main() { - var tagFilters tags = make(tags) - flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.") - flag.Parse() - - cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion()) - if err != nil { - fmt.Println(err) - return - } - - client := kafka.NewFromConfig(cfg) - +func fileSD(client *kafka.Client, filter Filter) { work := func() { - regexpFilter, err := regexp.Compile(*clusterFilter) - if err != nil { - fmt.Println(err) - return - } - - filter := Filter{ - NameFilter: *regexpFilter, - TagFilter: tagFilters, - } staticConfigs, err := GetStaticConfigs(client, filter) if err != nil { @@ -260,3 +242,56 @@ func main() { work() } } + +func httpSD(client *kafka.Client, filter Filter) { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + staticConfigs, err := GetStaticConfigs(client, filter) + if err != nil { + log.Println(err) + http.Error(w, "Internal Server Error", 500) + return + } + m, err := json.Marshal(staticConfigs) + if err != nil { + log.Println(err) + http.Error(w, "Internal Server Error", 500) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(m) + return + }) + + log.Fatal(http.ListenAndServe(*listenAddress, nil)) +} + +func main() { + var tagFilters tags = make(tags) + flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.") + flag.Parse() + + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion()) + if err != nil { + fmt.Println(err) + return + } + + client := kafka.NewFromConfig(cfg) + + regexpFilter, err := regexp.Compile(*clusterFilter) + if err != nil { + fmt.Println(err) + return + } + + filter := Filter{ + NameFilter: *regexpFilter, + TagFilter: tagFilters, + } + + if *httpSDEnabled { + httpSD(client, filter) + } else { + fileSD(client, filter) + } +}