Skip to content

Commit

Permalink
Prevent aggregate dimension duplicates. (#856)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Sep 21, 2023
1 parent 6b25891 commit 50b4ade
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 145 deletions.
46 changes: 29 additions & 17 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pmetric"
"golang.org/x/exp/maps"

configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/handlers"
Expand Down Expand Up @@ -502,51 +503,62 @@ func BuildDimensions(tagMap map[string]string) []*cloudwatch.Dimension {
return dimensions
}

func (c *CloudWatch) ProcessRollup(rawDimension []*cloudwatch.Dimension) [][]*cloudwatch.Dimension {
// ProcessRollup creates the dimension sets based on the dimensions available in the original metric.
func (c *CloudWatch) ProcessRollup(rawDimensions []*cloudwatch.Dimension) [][]*cloudwatch.Dimension {
rawDimensionMap := map[string]string{}
for _, v := range rawDimension {
for _, v := range rawDimensions {
rawDimensionMap[*v.Name] = *v.Value
}
targetDimensionsList := c.config.RollupDimensions
fullDimensionsList := [][]*cloudwatch.Dimension{rawDimension}
fullDimensionsList := [][]*cloudwatch.Dimension{rawDimensions}
for _, targetDimensions := range targetDimensionsList {
i := 0
// skip if target dimensions count is same or more than the original metric.
// cannot have dimensions that do not exist in the original metric.
if len(targetDimensions) >= len(rawDimensions) {
continue
}
count := 0
extraDimensions := make([]*cloudwatch.Dimension, len(targetDimensions))
for _, targetDimensionKey := range targetDimensions {
if val, ok := rawDimensionMap[targetDimensionKey]; !ok {
break
} else {
extraDimensions[i] = &cloudwatch.Dimension{
extraDimensions[count] = &cloudwatch.Dimension{
Name: aws.String(targetDimensionKey),
Value: aws.String(val),
}
}
i += 1
count++
}
if i == len(targetDimensions) && !reflect.DeepEqual(rawDimension, extraDimensions) {
if count == len(targetDimensions) {
fullDimensionsList = append(fullDimensionsList, extraDimensions)
}
}
return fullDimensionsList
}

// GetUniqueRollupList filters out duplicate dimensions within the sets and filters
// duplicate sets.
func GetUniqueRollupList(inputLists [][]string) [][]string {
uniqueLists := [][]string{}
if len(inputLists) > 0 {
uniqueLists = append(uniqueLists, inputLists[0])
}
var uniqueSets []collections.Set[string]
for _, inputList := range inputLists {
inputSet := collections.NewSet(inputList...)
count := 0
for _, u := range uniqueLists {
if reflect.DeepEqual(inputList, u) {
for _, uniqueSet := range uniqueSets {
if reflect.DeepEqual(inputSet, uniqueSet) {
break
}
count += 1
if count == len(uniqueLists) {
uniqueLists = append(uniqueLists, inputList)
}
count++
}
if count == len(uniqueSets) {
uniqueSets = append(uniqueSets, inputSet)
}
}
uniqueLists := make([][]string, len(uniqueSets))
for i, uniqueSet := range uniqueSets {
uniqueLists[i] = maps.Keys(uniqueSet)
sort.Strings(uniqueLists[i])
}
log.Printf("I! cloudwatch: get unique roll up list %v", uniqueLists)
return uniqueLists
}
Expand Down
224 changes: 96 additions & 128 deletions plugins/outputs/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func TestProcessRollup(t *testing.T) {
publisher.NewNonBlockingFifoQueue(10),
10,
2*time.Second,
cw.WriteToCloudWatch)
cw.config.RollupDimensions = [][]string{{"d1", "d2"}, {"d1"}, {}, {"d4"}}
cw.WriteToCloudWatch,
)

rawDimension := []*cloudwatch.Dimension{
testRawDimensions := []*cloudwatch.Dimension{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
Expand All @@ -118,122 +118,74 @@ func TestProcessRollup(t *testing.T) {
},
}

actualDimensionList := cw.ProcessRollup(rawDimension)
expectedDimensionList := [][]*cloudwatch.Dimension{
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
},
{
Name: aws.String("d3"),
Value: aws.String("v3"),
testCases := map[string]struct {
rollupDimensions [][]string
rawDimensions []*cloudwatch.Dimension
want [][]*cloudwatch.Dimension
}{
"WithSimpleRollup": {
rollupDimensions: [][]string{{"d1", "d2"}, {"d1"}, {}, {"d4"}},
rawDimensions: testRawDimensions,
want: [][]*cloudwatch.Dimension{
testRawDimensions,
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
},
},
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
},
{},
},
},
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
},
"WithNoRollupConfig": {
rollupDimensions: [][]string{},
rawDimensions: testRawDimensions,
want: [][]*cloudwatch.Dimension{testRawDimensions},
},
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
"WithNoRawDimensions": {
rollupDimensions: [][]string{{"d1", "d2"}, {"d1"}, {}},
rawDimensions: []*cloudwatch.Dimension{},
want: [][]*cloudwatch.Dimension{{}},
},
{},
}
assert.EqualValues(t, expectedDimensionList, actualDimensionList, "Unexpected dimension roll up list")

cw.config.RollupDimensions = [][]string{}
rawDimension = []*cloudwatch.Dimension{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
"WithDuplicate/SameOrder": {
rollupDimensions: [][]string{{"d1", "d2", "d3"}},
rawDimensions: testRawDimensions,
want: [][]*cloudwatch.Dimension{testRawDimensions},
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
"WithDuplicate/DifferentOrder": {
rollupDimensions: [][]string{{"d2", "d1", "d3"}},
rawDimensions: testRawDimensions,
want: [][]*cloudwatch.Dimension{testRawDimensions},
},
{
Name: aws.String("d3"),
Value: aws.String("v3"),
"WithSameLength/DifferentNames": {
rollupDimensions: [][]string{{"d1", "d3", "d4"}},
rawDimensions: testRawDimensions,
want: [][]*cloudwatch.Dimension{testRawDimensions},
},
}

actualDimensionList = cw.ProcessRollup(rawDimension)
expectedDimensionList = [][]*cloudwatch.Dimension{
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
},
{
Name: aws.String("d3"),
Value: aws.String("v3"),
},
"WithExtraDimensions": {
rollupDimensions: [][]string{{"d1", "d2", "d3", "d4"}},
rawDimensions: testRawDimensions,
want: [][]*cloudwatch.Dimension{testRawDimensions},
},
}
assert.EqualValues(t, expectedDimensionList, actualDimensionList, "Unexpected dimension roll up list without rollup setting")

cw.config.RollupDimensions = [][]string{{"d1", "d2"}, {"d1"}, {}}
rawDimension = []*cloudwatch.Dimension{}

actualDimensionList = cw.ProcessRollup(rawDimension)
expectedDimensionList = [][]*cloudwatch.Dimension{
{},
}
assert.EqualValues(t, expectedDimensionList, actualDimensionList, "Unexpected dimension roll up list with no raw dimensions")

cw.config.RollupDimensions = [][]string{{"d1", "d2", "d3"}}
rawDimension = []*cloudwatch.Dimension{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
},
{
Name: aws.String("d3"),
Value: aws.String("v3"),
},
}

actualDimensionList = cw.ProcessRollup(rawDimension)
expectedDimensionList = [][]*cloudwatch.Dimension{
{
{
Name: aws.String("d1"),
Value: aws.String("v1"),
},
{
Name: aws.String("d2"),
Value: aws.String("v2"),
},
{
Name: aws.String("d3"),
Value: aws.String("v3"),
},
},
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
cw.config.RollupDimensions = testCase.rollupDimensions
got := cw.ProcessRollup(testCase.rawDimensions)
assert.EqualValues(t, testCase.want, got, "Unexpected dimension roll up list")
})
}
assert.EqualValues(t, expectedDimensionList, actualDimensionList,
"Unexpected dimension roll up list with duplicate roll up")
cw.Shutdown(context.Background())
assert.NoError(t, cw.Shutdown(context.Background()))
}

func TestBuildMetricDatumDropUnsupported(t *testing.T) {
Expand All @@ -258,25 +210,41 @@ func TestBuildMetricDatumDropUnsupported(t *testing.T) {
}

func TestGetUniqueRollupList(t *testing.T) {
inputLists := [][]string{{"d1"}, {"d1"}, {"d2"}, {"d1"}}
actualLists := GetUniqueRollupList(inputLists)
expectedLists := [][]string{{"d1"}, {"d2"}}
assert.EqualValues(t, expectedLists, actualLists, "Duplicate list showed up")

inputLists = [][]string{{"d1", "d2", ""}}
actualLists = GetUniqueRollupList(inputLists)
expectedLists = [][]string{{"d1", "d2", ""}}
assert.EqualValues(t, expectedLists, actualLists, "Unique list should be same with input list")

inputLists = [][]string{{}, {}}
actualLists = GetUniqueRollupList(inputLists)
expectedLists = [][]string{{}}
assert.EqualValues(t, expectedLists, actualLists, "Unique list failed on empty list")

inputLists = [][]string{}
actualLists = GetUniqueRollupList(inputLists)
expectedLists = [][]string{}
assert.EqualValues(t, expectedLists, actualLists, "Unique list result should be empty")
testCases := map[string]struct {
input [][]string
want [][]string
}{
"WithEmpty": {
input: [][]string{},
want: [][]string{},
},
"WithSimple": {
input: [][]string{{"d1", "d2", ""}},
want: [][]string{{"", "d1", "d2"}},
},
"WithDuplicates/NoDimension": {
input: [][]string{{}, {}},
want: [][]string{{}},
},
"WithDuplicates/SingleDimension": {
input: [][]string{{"d1"}, {"d1"}, {"d2"}, {"d1"}},
want: [][]string{{"d1"}, {"d2"}},
},
"WithDuplicates/DifferentOrder": {
input: [][]string{{"d2", "d1", "d3"}, {"d3", "d1", "d2"}, {"d3", "d2", "d1"}},
want: [][]string{{"d1", "d2", "d3"}},
},
"WithDuplicates/WithinSets": {
input: [][]string{{"d1", "d1", "d2"}, {"d1", "d1"}, {"d2", "d1"}, {"d1"}},
want: [][]string{{"d1", "d2"}, {"d1"}},
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
got := GetUniqueRollupList(testCase.input)
assert.EqualValues(t, testCase.want, got)
})
}
}

func TestIsDropping(t *testing.T) {
Expand Down

0 comments on commit 50b4ade

Please sign in to comment.