Skip to content

Commit

Permalink
Fixing CloudWatch entity components bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihonl committed Oct 29, 2024
1 parent f7df406 commit 161cd5b
Show file tree
Hide file tree
Showing 22 changed files with 283 additions and 766 deletions.
45 changes: 32 additions & 13 deletions extension/entitystore/ec2Info.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"strings"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -36,6 +37,7 @@ type EC2Info struct {
metadataProvider ec2metadataprovider.MetadataProvider
logger *zap.Logger
done chan struct{}
mutex sync.RWMutex
}

func (ei *EC2Info) initEc2Info() {
Expand All @@ -47,7 +49,24 @@ func (ei *EC2Info) initEc2Info() {
return
}
ei.logger.Debug("Finished initializing EC2Info")
ei.ignoreInvalidFields()
}

func (ei *EC2Info) GetInstanceID() string {
ei.mutex.RLock()
defer ei.mutex.RUnlock()
return ei.InstanceID
}

func (ei *EC2Info) GetAccountID() string {
ei.mutex.RLock()
defer ei.mutex.RUnlock()
return ei.AccountID
}

func (ei *EC2Info) GetAutoScalingGroup() string {
ei.mutex.RLock()
defer ei.mutex.RUnlock()
return ei.AutoScalingGroup
}

func (ei *EC2Info) setInstanceIDAccountID() error {
Expand All @@ -65,8 +84,14 @@ func (ei *EC2Info) setInstanceIDAccountID() error {
}
}
ei.logger.Debug("Successfully retrieved Instance ID and Account ID")
ei.mutex.Lock()
ei.InstanceID = metadataDoc.InstanceID
if idLength := len(ei.InstanceID); idLength > instanceIdSizeMax {
ei.logger.Warn("InstanceId length exceeds characters limit and will be ignored", zap.Int("length", idLength), zap.Int("character limit", instanceIdSizeMax))
ei.InstanceID = ""
}
ei.AccountID = metadataDoc.AccountID
ei.mutex.Unlock()
return nil
}
}
Expand Down Expand Up @@ -116,7 +141,13 @@ func (ei *EC2Info) retrieveAsgName() error {
ei.logger.Error("Failed to get AutoScalingGroup through metadata provider", zap.Error(err))
} else {
ei.logger.Debug("AutoScalingGroup retrieved through IMDS")
ei.mutex.Lock()
ei.AutoScalingGroup = asg
if asgLength := len(ei.AutoScalingGroup); asgLength > autoScalingGroupSizeMax {
ei.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
ei.AutoScalingGroup = ""
}
ei.mutex.Unlock()
}
}
return nil
Expand All @@ -130,15 +161,3 @@ func newEC2Info(metadataProvider ec2metadataprovider.MetadataProvider, done chan
logger: logger,
}
}

func (ei *EC2Info) ignoreInvalidFields() {
if idLength := len(ei.InstanceID); idLength > instanceIdSizeMax {
ei.logger.Warn("InstanceId length exceeds characters limit and will be ignored", zap.Int("length", idLength), zap.Int("character limit", instanceIdSizeMax))
ei.InstanceID = ""
}

if asgLength := len(ei.AutoScalingGroup); asgLength > autoScalingGroupSizeMax {
ei.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
ei.AutoScalingGroup = ""
}
}
113 changes: 42 additions & 71 deletions extension/entitystore/ec2Info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ var mockedInstanceIdentityDoc = &ec2metadata.EC2InstanceIdentityDocument{
ImageID: "ami-09edd32d9b0990d49",
}

var mockedInstanceIdentityDocWithLargeInstanceId = &ec2metadata.EC2InstanceIdentityDocument{
InstanceID: "i-01d2417c27a396e44394824728",
AccountID: "874389809020",
Region: "us-east-1",
InstanceType: "m5ad.large",
ImageID: "ami-09edd32d9b0990d49",
}

var (
tagVal3 = "ASG-1"
)

func TestSetInstanceIdAndRegion(t *testing.T) {
func TestSetInstanceIDAccountID(t *testing.T) {
type args struct {
metadataProvider ec2metadataprovider.MetadataProvider
}
Expand All @@ -50,6 +58,17 @@ func TestSetInstanceIdAndRegion(t *testing.T) {
AccountID: mockedInstanceIdentityDoc.AccountID,
},
},
{
name: "InstanceId too large",
args: args{
metadataProvider: &mockMetadataProvider{InstanceIdentityDocument: mockedInstanceIdentityDocWithLargeInstanceId},
},
wantErr: false,
want: EC2Info{
InstanceID: "",
AccountID: mockedInstanceIdentityDocWithLargeInstanceId.AccountID,
},
},
}
for _, tt := range tests {
logger, _ := zap.NewDevelopment()
Expand All @@ -61,8 +80,8 @@ func TestSetInstanceIdAndRegion(t *testing.T) {
if err := ei.setInstanceIDAccountID(); (err != nil) != tt.wantErr {
t.Errorf("setInstanceIDAccountID() error = %v, wantErr %v", err, tt.wantErr)
}
assert.Equal(t, tt.want.InstanceID, ei.InstanceID)
assert.Equal(t, tt.want.AccountID, ei.AccountID)
assert.Equal(t, tt.want.InstanceID, ei.GetInstanceID())
assert.Equal(t, tt.want.AccountID, ei.GetAccountID())
})
}
}
Expand Down Expand Up @@ -104,6 +123,23 @@ func TestRetrieveASGName(t *testing.T) {
AutoScalingGroup: tagVal3,
},
},
{
name: "AutoScalingGroup too large",
args: args{
metadataProvider: &mockMetadataProvider{
InstanceIdentityDocument: mockedInstanceIdentityDoc,
Tags: map[string]string{
"aws:autoscaling:groupName": strings.Repeat("a", 256),
"env": "test-env",
"name": "test-name",
}},
},

wantErr: false,
want: EC2Info{
AutoScalingGroup: "",
},
},
{
name: "Success IMDS tags call but no ASG",
args: args{
Expand All @@ -122,72 +158,7 @@ func TestRetrieveASGName(t *testing.T) {
if err := ei.retrieveAsgName(); (err != nil) != tt.wantErr {
t.Errorf("retrieveAsgName() error = %v, wantErr %v", err, tt.wantErr)
}
assert.Equal(t, tt.want.AutoScalingGroup, ei.AutoScalingGroup)
})
}
}

func TestIgnoreInvalidFields(t *testing.T) {
logger, _ := zap.NewDevelopment()
type want struct {
instanceId string
accountId string
autoScalingGroup string
}
tests := []struct {
name string
args *EC2Info
want want
}{
{
name: "Happy path",
args: &EC2Info{
InstanceID: "i-01d2417c27a396e44",
AccountID: "0123456789012",
AutoScalingGroup: "asg",
logger: logger,
},
want: want{
instanceId: "i-01d2417c27a396e44",
accountId: "0123456789012",
autoScalingGroup: "asg",
},
},
{
name: "InstanceId too large",
args: &EC2Info{
InstanceID: strings.Repeat("a", 20),
AccountID: "0123456789012",
AutoScalingGroup: "asg",
logger: logger,
},
want: want{
instanceId: "",
accountId: "0123456789012",
autoScalingGroup: "asg",
},
},
{
name: "AutoScalingGroup too large",
args: &EC2Info{
InstanceID: "i-01d2417c27a396e44",
AccountID: "0123456789012",
AutoScalingGroup: strings.Repeat("a", 256),
logger: logger,
},
want: want{
instanceId: "i-01d2417c27a396e44",
accountId: "0123456789012",
autoScalingGroup: "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.args.ignoreInvalidFields()
assert.Equal(t, tt.want.instanceId, tt.args.InstanceID)
assert.Equal(t, tt.want.accountId, tt.args.AccountID)
assert.Equal(t, tt.want.autoScalingGroup, tt.args.AutoScalingGroup)
assert.Equal(t, tt.want.AutoScalingGroup, ei.GetAutoScalingGroup())
})
}
}
Expand Down Expand Up @@ -229,8 +200,8 @@ func TestLogMessageDoesNotIncludeResourceInfo(t *testing.T) {

logOutput := buf.String()
log.Println(logOutput)
assert.NotContains(t, logOutput, ei.InstanceID)
assert.NotContains(t, logOutput, ei.AutoScalingGroup)
assert.NotContains(t, logOutput, ei.GetInstanceID())
assert.NotContains(t, logOutput, ei.GetAutoScalingGroup())
})
}
}
10 changes: 9 additions & 1 deletion extension/entitystore/eksInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
"go.uber.org/zap"
)

const ttlDuration = 5 * time.Minute
const (
ttlDuration = 5 * time.Minute

// Agent server extension is mainly opened for FluentBit to
// consume data and FluentBit only caches 256 pods in memory
// so we will follow the same pattern
maxPodAssociationMapCapacity = 256
)

type ServiceEnvironment struct {
ServiceName string
Expand All @@ -28,6 +35,7 @@ func newEKSInfo(logger *zap.Logger) *eksInfo {
logger: logger,
podToServiceEnvMap: ttlcache.New[string, ServiceEnvironment](
ttlcache.WithTTL[string, ServiceEnvironment](ttlDuration),
ttlcache.WithCapacity[string, ServiceEnvironment](maxPodAssociationMapCapacity),
),
}
}
Expand Down
19 changes: 19 additions & 0 deletions extension/entitystore/eksInfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package entitystore

import (
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -104,6 +105,24 @@ func TestAddPodServiceEnvironmentMapping_TtlRefresh(t *testing.T) {
assert.Equal(t, 1, ei.podToServiceEnvMap.Len())
}

func TestAddPodServiceEnvironmentMapping_MaxCapacity(t *testing.T) {
logger := zap.NewNop()
ei := newEKSInfo(logger)

//adds new pod to service environment mapping
for i := 0; i < 300; i++ {
ei.AddPodServiceEnvironmentMapping("test-pod-"+strconv.Itoa(i), "test-service", "test-environment", "Instrumentation")
}
assert.Equal(t, maxPodAssociationMapCapacity, ei.podToServiceEnvMap.Len())
itemIndex := 299
ei.podToServiceEnvMap.Range(func(item *ttlcache.Item[string, ServiceEnvironment]) bool {
// Check if the item's value equals the target string
assert.Equal(t, item.Key(), "test-pod-"+strconv.Itoa(itemIndex))
itemIndex--
return true
})
}

func TestGetPodServiceEnvironmentMapping(t *testing.T) {
tests := []struct {
name string
Expand Down
23 changes: 10 additions & 13 deletions extension/entitystore/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,27 @@ func (e *EntityStore) Start(ctx context.Context, host component.Host) error {
Profile: e.config.Profile,
Filename: e.config.Filename,
}
e.serviceprovider = newServiceProvider(e.mode, e.config.Region, &e.ec2Info, e.metadataprovider, getEC2Provider, ec2CredentialConfig, e.done, e.logger)
switch e.mode {
case config.ModeEC2:
e.ec2Info = *newEC2Info(e.metadataprovider, e.done, e.config.Region, e.logger)
go e.ec2Info.initEc2Info()
go e.serviceprovider.startServiceProvider()
}
if e.kubernetesMode != "" {
e.eksInfo = newEKSInfo(e.logger)
// Starting the ttl cache will automatically evict all expired pods from the map
go e.StartPodToServiceEnvironmentMappingTtlCache(e.done)
go e.StartPodToServiceEnvironmentMappingTtlCache()
}
e.serviceprovider = newServiceProvider(e.mode, e.config.Region, &e.ec2Info, e.metadataprovider, getEC2Provider, ec2CredentialConfig, e.done, e.logger)
go e.serviceprovider.startServiceProvider()
return nil
}

func (e *EntityStore) Shutdown(_ context.Context) error {
close(e.done)
if e.eksInfo != nil && e.eksInfo.podToServiceEnvMap != nil {
e.eksInfo.podToServiceEnvMap.Stop()
}
e.logger.Info("Pod to Service Environment Mapping TTL Cache stopped")
return nil
}

Expand Down Expand Up @@ -189,16 +193,9 @@ func (e *EntityStore) AddPodServiceEnvironmentMapping(podName string, serviceNam
}
}

func (e *EntityStore) StartPodToServiceEnvironmentMappingTtlCache(done chan struct{}) {
func (e *EntityStore) StartPodToServiceEnvironmentMappingTtlCache() {
if e.eksInfo != nil {
e.eksInfo.podToServiceEnvMap.Start()

// Start a goroutine to stop the cache when done channel is closed
go func() {
<-done
e.eksInfo.podToServiceEnvMap.Stop()
e.logger.Info("Pod to Service Environment Mapping TTL Cache stopped")
}()
}
}

Expand All @@ -215,8 +212,8 @@ func (e *EntityStore) createAttributeMap() map[string]*string {
attributeMap := make(map[string]*string)

if e.mode == config.ModeEC2 {
addNonEmptyToMap(attributeMap, InstanceIDKey, e.ec2Info.InstanceID)
addNonEmptyToMap(attributeMap, ASGKey, e.ec2Info.AutoScalingGroup)
addNonEmptyToMap(attributeMap, InstanceIDKey, e.ec2Info.GetInstanceID())
addNonEmptyToMap(attributeMap, ASGKey, e.ec2Info.GetAutoScalingGroup())
}
switch e.mode {
case config.ModeEC2:
Expand Down
Loading

0 comments on commit 161cd5b

Please sign in to comment.