diff --git a/README.md b/README.md index 2ced3e4..dd5bee8 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,8 @@ When using AWS credentials or IAM Roles, the following policy needs to be attach ``` Usage of ./prometheus-msk-discovery: + -filter string + a regex pattern to filter cluster names from the results -job-prefix string string with which to prefix each job label (default "msk") -output string @@ -53,7 +55,7 @@ Usage of ./prometheus-msk-discovery: ### Example output: ``` -$ ./prometheus-msk-discovery -scrape-interval 10s +$ ./prometheus-msk-discovery -scrape-interval 10s -filter 'primary' 2021/04/04 21:02:55 Writing 1 discovered exporters to msk_file_sd.yml ``` diff --git a/main.go b/main.go index 64da58d..bdcfc3b 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "regexp" "strings" "time" @@ -21,6 +22,7 @@ const nodeExporterPort = 11002 var outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to") var interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information") var jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label") +var clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results") type kafkaClient interface { ListClusters(ctx context.Context, params *kafka.ListClustersInput, optFns ...func(*kafka.Options)) (*kafka.ListClustersOutput, error) @@ -125,14 +127,33 @@ func buildClusterDetails(svc kafkaClient, c types.ClusterInfo) (clusterDetails, return cluster, nil } +func filterClusters(clusters kafka.ListClustersOutput, filter regexp.Regexp) *kafka.ListClustersOutput { + var filteredClusters []types.ClusterInfo + + for _, cluster := range clusters.ClusterInfoList { + if filter.MatchString(*cluster.ClusterName) { + filteredClusters = append(filteredClusters, cluster) + } + } + + return &kafka.ListClustersOutput{ClusterInfoList: filteredClusters} +} + // GetStaticConfigs pulls a list of MSK clusters and brokers and returns a slice of PrometheusStaticConfigs -func GetStaticConfigs(svc kafkaClient) ([]PrometheusStaticConfig, error) { +func GetStaticConfigs(svc kafkaClient, opt_filter ...regexp.Regexp) ([]PrometheusStaticConfig, error) { + filter, _ := regexp.Compile(``) + if len(opt_filter) > 0 { + filter = &opt_filter[0] + } + clusters, err := getClusters(svc) if err != nil { return []PrometheusStaticConfig{}, err } staticConfigs := []PrometheusStaticConfig{} + clusters = filterClusters(*clusters, *filter) + for _, cluster := range clusters.ClusterInfoList { clusterDetails, err := buildClusterDetails(svc, cluster) if err != nil { @@ -160,7 +181,13 @@ func main() { work := func() { - staticConfigs, err := GetStaticConfigs(client) + regexpFilter, err := regexp.Compile(*clusterFilter) + if err != nil { + fmt.Println(err) + return + } + + staticConfigs, err := GetStaticConfigs(client, *regexpFilter) if err != nil { fmt.Println(err) return diff --git a/main_test.go b/main_test.go index d9ee06a..2eebb0b 100644 --- a/main_test.go +++ b/main_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sort" "testing" @@ -160,3 +161,75 @@ func TestGetStaticConfigs(t *testing.T) { }) } + +func strPtr(str string) *string { + return &str +} + +func Test_filterClusters(t *testing.T) { + type args struct { + clusters kafka.ListClustersOutput + filter regexp.Regexp + } + + defaultFilter, _ := regexp.Compile(``) + testClusterFilter, _ := regexp.Compile(`test`) + + tests := []struct { + name string + args args + want *kafka.ListClustersOutput + }{ + { + name: "empty-filter", + args: args{ + clusters: kafka.ListClustersOutput{ + ClusterInfoList: []types.ClusterInfo{ + { + ClusterName: strPtr("test-cluster"), + }, + }, + }, + filter: *defaultFilter, + }, + want: &kafka.ListClustersOutput{ + ClusterInfoList: []types.ClusterInfo{ + { + ClusterName: strPtr("test-cluster"), + }, + }, + }, + }, + { + name: "test-cluster-filter", + args: args{ + clusters: kafka.ListClustersOutput{ + ClusterInfoList: []types.ClusterInfo{ + { + ClusterName: strPtr("test-cluster"), + }, + { + ClusterName: strPtr("filtered-cluster"), + }, + }, + }, + filter: *testClusterFilter, + }, + want: &kafka.ListClustersOutput{ + ClusterInfoList: []types.ClusterInfo{ + { + ClusterName: strPtr("test-cluster"), + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filterClusters(tt.args.clusters, tt.args.filter); !reflect.DeepEqual(got, tt.want) { + t.Errorf("filterClusters() = %v, want %v", got, tt.want) + } + }) + } +}