Skip to content

Commit

Permalink
HTTP service discovery (#16)
Browse files Browse the repository at this point in the history
* Extract fileSD function

* Add option for http service discovery

* Update docs

* Fix code block in readme

* Clarify scrape interval parameter

---------

Co-authored-by: Josh Mills <[email protected]>
  • Loading branch information
errm and joshm91 authored Nov 27, 2023
1 parent aaea76a commit c686e6d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 36 deletions.
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Service discovery for [AWS MSK](https://aws.amazon.com/msk/), compatible with [P

## How it works

This service gets a list of MSK clusters in an AWS account and exports each broker to a Prometheus-compatible static config to be used with the [`file_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) mechanism.
This service gets a list of MSK clusters in an AWS account and exports each broker to a Prometheus-compatible static config to be used with the [`file_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) or [`http_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config) mechanism.

## Pre-requisites

Expand Down Expand Up @@ -44,12 +44,20 @@ When using AWS credentials or IAM Roles, the following policy needs to be attach
Usage of ./prometheus-msk-discovery:
-filter string
a regex pattern to filter cluster names from the results
-http-sd
expose http_sd interface rather than writing a file
-job-prefix string
string with which to prefix each job label (default "msk")
-listen-address string
Address to listen on for http service discovery (default ":8080")
-output string
path of the file to write MSK discovery information to (default "msk_file_sd.yml")
-region string
the aws region in which to scan for MSK clusters
-scrape-interval duration
interval at which to scrape the AWS API for MSK cluster information (default 5m0s)
interval at which to scrape the AWS API for MSK cluster information when in file_sd mode (default 5m0s)
-tag value
A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.
```

### Example output:
Expand All @@ -61,6 +69,23 @@ $ ./prometheus-msk-discovery -scrape-interval 10s -filter 'primary'

An example output file can be found [here](examples/msk_file_sd.yml)

### http_sd

```
$ ./prometheus-msk-discovery -http-sd -listen-address :8989 -filter 'primary'
```

```
$ curl localhost:8989
[{"targets":["b-1.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11001","b-1.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11002","b-2.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11001","b-2.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11002"],"labels":{"job":"msk-primary-kafka","cluster_name":"primary-kafka","cluster_arn":"arn:aws:kafka:eu-west-2:111111111111:cluster/primary-kafka/522d90ab-d400-4ea0-b8fd-bbf3576425d4-2"}}]
```

```yaml
http_sd_configs:
- url: http://localhost:8989
refresh_interval: 30s
```
## Region Precedence
When no region is specified with the `-region` flag the process first attempts to load the default SDK configuration checking for an `AWS_REGION` environment variable or reading any region specified in the standard [configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). If no region is found it will attempt to retrieve it from the EC2 Instance Metadata Service.

Expand Down
103 changes: 69 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"regexp"
"strings"
"time"
Expand All @@ -24,11 +26,13 @@ const (
type tags map[string]string

var (
outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information")
jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")
awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters")
outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information when in file_sd mode")
jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")
awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters")
httpSDEnabled = flag.Bool("http-sd", false, "expose http_sd interface rather than writing a file")
listenAddress = flag.String("listen-address", ":8080", "Address to listen on for http service discovery")
)

type kafkaClient interface {
Expand All @@ -37,16 +41,16 @@ type kafkaClient interface {
}

type labels struct {
Job string `yaml:"job"`
ClusterName string `yaml:"cluster_name"`
ClusterArn string `yaml:"cluster_arn"`
Job string `yaml:"job" json:"job"`
ClusterName string `yaml:"cluster_name" json:"cluster_name"`
ClusterArn string `yaml:"cluster_arn" json:"cluster_arn"`
}

// PrometheusStaticConfig is the final structure of a single static config that
// will be outputted to the Prometheus file service discovery config
// will be outputted to the Prometheus file/http service discovery config
type PrometheusStaticConfig struct {
Targets []string `yaml:"targets"`
Labels labels `yaml:"labels"`
Targets []string `yaml:"targets" json:"targets"`
Labels labels `yaml:"labels" json:"labels"`
}

// clusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled
Expand Down Expand Up @@ -205,30 +209,8 @@ func GetStaticConfigs(svc kafkaClient, opt_filter ...Filter) ([]PrometheusStatic
return staticConfigs, nil
}

func main() {
var tagFilters tags = make(tags)
flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.")
flag.Parse()

cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion())
if err != nil {
fmt.Println(err)
return
}

client := kafka.NewFromConfig(cfg)

func fileSD(client *kafka.Client, filter Filter) {
work := func() {
regexpFilter, err := regexp.Compile(*clusterFilter)
if err != nil {
fmt.Println(err)
return
}

filter := Filter{
NameFilter: *regexpFilter,
TagFilter: tagFilters,
}

staticConfigs, err := GetStaticConfigs(client, filter)
if err != nil {
Expand Down Expand Up @@ -260,3 +242,56 @@ func main() {
work()
}
}

func httpSD(client *kafka.Client, filter Filter) {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
staticConfigs, err := GetStaticConfigs(client, filter)
if err != nil {
log.Println(err)
http.Error(w, "Internal Server Error", 500)
return
}
m, err := json.Marshal(staticConfigs)
if err != nil {
log.Println(err)
http.Error(w, "Internal Server Error", 500)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(m)
return
})

log.Fatal(http.ListenAndServe(*listenAddress, nil))
}

func main() {
var tagFilters tags = make(tags)
flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.")
flag.Parse()

cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion())
if err != nil {
fmt.Println(err)
return
}

client := kafka.NewFromConfig(cfg)

regexpFilter, err := regexp.Compile(*clusterFilter)
if err != nil {
fmt.Println(err)
return
}

filter := Filter{
NameFilter: *regexpFilter,
TagFilter: tagFilters,
}

if *httpSDEnabled {
httpSD(client, filter)
} else {
fileSD(client, filter)
}
}

0 comments on commit c686e6d

Please sign in to comment.