Skip to content

Commit

Permalink
Refactor and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
joshm91 committed Apr 7, 2021
1 parent e88c30a commit a086bc4
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 27 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Release

on:
push:
tags:
- "v*.*.*"

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Build
run: |
go build -o prometheus-msk-discovery-linux-amd64
- name: Release
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
files: "prometheus-msk-discovery-linux-amd64"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
11 changes: 11 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
on: push
jobs:
test:
name: Run unit tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/setup-go@v2
with:
go-version: "1.16"
- run: go test .
49 changes: 22 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type PrometheusStaticConfig struct {
Labels labels `yaml:"labels"`
}

// ClusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled
type ClusterDetails struct {
// clusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled
type clusterDetails struct {
ClusterName string
ClusterArn string
Brokers []string
Expand All @@ -50,7 +50,7 @@ type ClusterDetails struct {
}

// (ClusterDetails).StaticConfig generates a PrometheusStaticConfig based on the cluster's details
func (c ClusterDetails) StaticConfig() PrometheusStaticConfig {
func (c clusterDetails) StaticConfig() PrometheusStaticConfig {
ret := PrometheusStaticConfig{}
ret.Labels = labels{
Job: strings.Join([]string{*jobPrefix, c.ClusterName}, "-"),
Expand Down Expand Up @@ -103,14 +103,14 @@ func GetBrokers(svc kafkaClient, arn string) ([]string, error) {
}

// BuildClusterDetails extracts the relevant details from a ClusterInfo and returns a ClusterDetails
func BuildClusterDetails(svc kafkaClient, c types.ClusterInfo) (ClusterDetails, error) {
func BuildClusterDetails(svc kafkaClient, c types.ClusterInfo) (clusterDetails, error) {
brokers, err := GetBrokers(svc, *c.ClusterArn)
if err != nil {
fmt.Println(err)
return ClusterDetails{}, err
return clusterDetails{}, err
}

cluster := ClusterDetails{
cluster := clusterDetails{
ClusterName: *c.ClusterName,
ClusterArn: *c.ClusterArn,
Brokers: brokers,
Expand All @@ -120,34 +120,29 @@ func BuildClusterDetails(svc kafkaClient, c types.ClusterInfo) (ClusterDetails,
return cluster, nil
}

func GetStaticConfigs(svc kafkaClient) []PrometheusStaticConfig {
clusters, _ := GetClusters(svc)
staticConfigs := []PrometheusStaticConfig{}

for _, cluster := range clusters.ClusterInfoList {
clusterDetails, _ := BuildClusterDetails(svc, cluster)
if !clusterDetails.JmxExporter && !clusterDetails.NodeExporter {
continue
}
staticConfigs = append(staticConfigs, clusterDetails.StaticConfig())
}
return staticConfigs
}

func main() {
flag.Parse()

cfg, _ := config.LoadDefaultConfig(context.TODO())
client := kafka.NewFromConfig(cfg)
work := func() {
result, err := GetClusters(client)
if err != nil {
fmt.Println(err)
return
}

var clusters []ClusterDetails
for _, r := range result.ClusterInfoList {
c, err := BuildClusterDetails(client, r)
if err != nil {
fmt.Println(err)
return
}
clusters = append(clusters, c)
}

var staticConfigs []PrometheusStaticConfig
for _, c := range clusters {
info := c.StaticConfig()
staticConfigs = append(staticConfigs, info)
}
work := func() {

staticConfigs := GetStaticConfigs(client)
m, err := yaml.Marshal(staticConfigs)
if err != nil {
fmt.Println(err)
Expand Down
154 changes: 154 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"context"
"fmt"
"reflect"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
)

type mockCluster struct {
brokerCount int
clusterName string
jmxExporter bool
nodeExporter bool
}

type mockKafkaClient struct{ clusters map[string]mockCluster }

func (m mockKafkaClient) ListClusters(ctx context.Context, params *kafka.ListClustersInput, optFns ...func(*kafka.Options)) (*kafka.ListClustersOutput, error) {
var clusterInfoList []types.ClusterInfo
for arn, cluster := range m.clusters {
cArn := arn
cCluster := cluster
clusterInfoList = append(clusterInfoList, types.ClusterInfo{
ClusterArn: &cArn,
ClusterName: &cCluster.clusterName,
OpenMonitoring: &types.OpenMonitoring{
Prometheus: &types.Prometheus{
JmxExporter: &types.JmxExporter{
EnabledInBroker: cCluster.jmxExporter,
},
NodeExporter: &types.NodeExporter{
EnabledInBroker: cCluster.nodeExporter,
},
},
},
})
}
output := kafka.ListClustersOutput{
ClusterInfoList: clusterInfoList,
}
return &output, nil
}

func (m mockKafkaClient) GetBootstrapBrokers(ctx context.Context, params *kafka.GetBootstrapBrokersInput, optFns ...func(*kafka.Options)) (*kafka.GetBootstrapBrokersOutput, error) {
cluster := m.clusters[*params.ClusterArn]
var brokers []string

for i := 1; i <= cluster.brokerCount; {
brokers = append(brokers, fmt.Sprintf("b-%v.broker.com:9002", i))
i++
}

brokerString := strings.Join(brokers, ",")
output := kafka.GetBootstrapBrokersOutput{
BootstrapBrokerString: &brokerString,
}
return &output, nil
}

func TestGetStaticConfigs(t *testing.T) {
t.Run("OneClusterTwoBrokersFullMonitoring", func(t *testing.T) {
var client mockKafkaClient
client.clusters = make(map[string]mockCluster)
client.clusters["arn:::my-cluster"] = mockCluster{2, "my-cluster", true, true}

got := GetStaticConfigs(client)
want := []PrometheusStaticConfig{
{
Targets: []string{
"b-1.broker.com:11001",
"b-1.broker.com:11002",
"b-2.broker.com:11001",
"b-2.broker.com:11002",
},
Labels: labels{
Job: "msk-my-cluster",
ClusterName: "my-cluster",
ClusterArn: "arn:::my-cluster",
},
},
}
if !reflect.DeepEqual(got, want) {
t.Errorf("got %s want %s", got, want)
}
})

t.Run("TwoClusterTwoBrokersFullAndLimitedMonitoring", func(t *testing.T) {
var client mockKafkaClient
client.clusters = make(map[string]mockCluster)
client.clusters["arn:::my-cluster"] = mockCluster{2, "my-cluster", true, true}
client.clusters["arn:::my-other-cluster"] = mockCluster{2, "my-other-cluster", true, false}

got := GetStaticConfigs(client)
want := []PrometheusStaticConfig{
{
Targets: []string{
"b-1.broker.com:11001",
"b-1.broker.com:11002",
"b-2.broker.com:11001",
"b-2.broker.com:11002",
},
Labels: labels{
Job: "msk-my-cluster",
ClusterName: "my-cluster",
ClusterArn: "arn:::my-cluster",
},
},
{
Targets: []string{
"b-1.broker.com:11001",
"b-2.broker.com:11001",
},
Labels: labels{
Job: "msk-my-other-cluster",
ClusterName: "my-other-cluster",
ClusterArn: "arn:::my-other-cluster",
},
},
}
if !reflect.DeepEqual(got, want) {
t.Errorf("got %s want %s", got, want)
}
})

t.Run("NoMonitoringEnabled", func(t *testing.T) {
var client mockKafkaClient
client.clusters = make(map[string]mockCluster)
client.clusters["arn:::my-cluster"] = mockCluster{2, "my-cluster", false, false}

got := GetStaticConfigs(client)
want := []PrometheusStaticConfig{}

if !reflect.DeepEqual(got, want) {
t.Errorf("got %s want %s", got, want)
}
})

t.Run("NoClusters", func(t *testing.T) {
var client mockKafkaClient
client.clusters = make(map[string]mockCluster)

got := GetStaticConfigs(client)
want := []PrometheusStaticConfig{}
if !reflect.DeepEqual(got, want) {
t.Errorf("got %s want %s", got, want)
}
})

}

0 comments on commit a086bc4

Please sign in to comment.