Skip to content

Commit

Permalink
Merge pull request #6 from statsbomb/feature/regex_cluster_filter
Browse files Browse the repository at this point in the history
Implement regex filter on cluster name
  • Loading branch information
joshm91 authored Sep 29, 2021
2 parents 30170e2 + 9cb2996 commit cb180f6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 3 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ When using AWS credentials or IAM Roles, the following policy needs to be attach

```
Usage of ./prometheus-msk-discovery:
-filter string
a regex pattern to filter cluster names from the results
-job-prefix string
string with which to prefix each job label (default "msk")
-output string
Expand All @@ -53,7 +55,7 @@ Usage of ./prometheus-msk-discovery:
### Example output:

```
$ ./prometheus-msk-discovery -scrape-interval 10s
$ ./prometheus-msk-discovery -scrape-interval 10s -filter 'primary'
2021/04/04 21:02:55 Writing 1 discovered exporters to msk_file_sd.yml
```

Expand Down
31 changes: 29 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"log"
"regexp"
"strings"
"time"

Expand All @@ -21,6 +22,7 @@ const nodeExporterPort = 11002
var outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
var interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information")
var jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
var clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")

type kafkaClient interface {
ListClusters(ctx context.Context, params *kafka.ListClustersInput, optFns ...func(*kafka.Options)) (*kafka.ListClustersOutput, error)
Expand Down Expand Up @@ -125,14 +127,33 @@ func buildClusterDetails(svc kafkaClient, c types.ClusterInfo) (clusterDetails,
return cluster, nil
}

func filterClusters(clusters kafka.ListClustersOutput, filter regexp.Regexp) *kafka.ListClustersOutput {
var filteredClusters []types.ClusterInfo

for _, cluster := range clusters.ClusterInfoList {
if filter.MatchString(*cluster.ClusterName) {
filteredClusters = append(filteredClusters, cluster)
}
}

return &kafka.ListClustersOutput{ClusterInfoList: filteredClusters}
}

// GetStaticConfigs pulls a list of MSK clusters and brokers and returns a slice of PrometheusStaticConfigs
func GetStaticConfigs(svc kafkaClient) ([]PrometheusStaticConfig, error) {
func GetStaticConfigs(svc kafkaClient, opt_filter ...regexp.Regexp) ([]PrometheusStaticConfig, error) {
filter, _ := regexp.Compile(``)
if len(opt_filter) > 0 {
filter = &opt_filter[0]
}

clusters, err := getClusters(svc)
if err != nil {
return []PrometheusStaticConfig{}, err
}
staticConfigs := []PrometheusStaticConfig{}

clusters = filterClusters(*clusters, *filter)

for _, cluster := range clusters.ClusterInfoList {
clusterDetails, err := buildClusterDetails(svc, cluster)
if err != nil {
Expand Down Expand Up @@ -160,7 +181,13 @@ func main() {

work := func() {

staticConfigs, err := GetStaticConfigs(client)
regexpFilter, err := regexp.Compile(*clusterFilter)
if err != nil {
fmt.Println(err)
return
}

staticConfigs, err := GetStaticConfigs(client, *regexpFilter)
if err != nil {
fmt.Println(err)
return
Expand Down
73 changes: 73 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"sort"
"testing"

Expand Down Expand Up @@ -160,3 +161,75 @@ func TestGetStaticConfigs(t *testing.T) {
})

}

func strPtr(str string) *string {
return &str
}

func Test_filterClusters(t *testing.T) {
type args struct {
clusters kafka.ListClustersOutput
filter regexp.Regexp
}

defaultFilter, _ := regexp.Compile(``)
testClusterFilter, _ := regexp.Compile(`test`)

tests := []struct {
name string
args args
want *kafka.ListClustersOutput
}{
{
name: "empty-filter",
args: args{
clusters: kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
},
},
},
filter: *defaultFilter,
},
want: &kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
},
},
},
},
{
name: "test-cluster-filter",
args: args{
clusters: kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
},
{
ClusterName: strPtr("filtered-cluster"),
},
},
},
filter: *testClusterFilter,
},
want: &kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := filterClusters(tt.args.clusters, tt.args.filter); !reflect.DeepEqual(got, tt.want) {
t.Errorf("filterClusters() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit cb180f6

Please sign in to comment.