Skip to content

Commit

Permalink
Revert "Do not send AccountID as KeyAttribute for entities in PutLogE…
Browse files Browse the repository at this point in the history
…vents requests" and add unit tests (#1411)

Co-authored-by: zhihonl <[email protected]>
  • Loading branch information
nathalapooja and zhihonl authored Nov 6, 2024
1 parent 1e682da commit a19df6f
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 10 deletions.
4 changes: 2 additions & 2 deletions extension/entitystore/ec2Info.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ei *EC2Info) setInstanceIDAccountID() error {
for {
metadataDoc, err := ei.metadataProvider.Get(context.Background())
if err != nil {
ei.logger.Warn("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err))
ei.logger.Debug("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err))
wait := time.NewTimer(1 * time.Minute)
select {
case <-ei.done:
Expand Down Expand Up @@ -122,7 +122,7 @@ func (ei *EC2Info) setAutoScalingGroup() error {
}

if err := ei.retrieveAsgName(); err != nil {
ei.logger.Warn("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err))
ei.logger.Debug("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err))
} else {
ei.logger.Debug("Retrieval of auto-scaling group tags succeeded")
return nil
Expand Down
5 changes: 4 additions & 1 deletion extension/entitystore/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName
keyAttributes := e.createServiceKeyAttributes(serviceAttr)
attributeMap := e.createAttributeMap()
addNonEmptyToMap(attributeMap, ServiceNameSourceKey, serviceAttr.ServiceNameSource)

if _, ok := keyAttributes[entityattributes.AwsAccountId]; !ok {
return nil
}
return &cloudwatchlogs.Entity{
KeyAttributes: keyAttributes,
Attributes: attributeMap,
Expand Down Expand Up @@ -235,6 +237,7 @@ func (e *EntityStore) createServiceKeyAttributes(serviceAttr ServiceAttribute) m
}
addNonEmptyToMap(serviceKeyAttr, entityattributes.ServiceName, serviceAttr.ServiceName)
addNonEmptyToMap(serviceKeyAttr, entityattributes.DeploymentEnvironment, serviceAttr.Environment)
addNonEmptyToMap(serviceKeyAttr, entityattributes.AwsAccountId, e.ec2Info.GetAccountID())
return serviceKeyAttr
}

Expand Down
4 changes: 3 additions & 1 deletion extension/entitystore/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func TestEntityStore_createServiceKeyAttributes(t *testing.T) {

func TestEntityStore_createLogFileRID(t *testing.T) {
instanceId := "i-abcd1234"
accountId := "123456789012"
glob := LogFileGlob("glob")
group := LogGroupName("group")
serviceAttr := ServiceAttribute{
Expand All @@ -321,7 +322,7 @@ func TestEntityStore_createLogFileRID(t *testing.T) {
sp.On("logFileServiceAttribute", glob, group).Return(serviceAttr)
e := EntityStore{
mode: config.ModeEC2,
ec2Info: EC2Info{InstanceID: instanceId},
ec2Info: EC2Info{InstanceID: instanceId, AccountID: accountId},
serviceprovider: sp,
nativeCredential: &session.Session{},
}
Expand All @@ -333,6 +334,7 @@ func TestEntityStore_createLogFileRID(t *testing.T) {
entityattributes.DeploymentEnvironment: aws.String("test-environment"),
entityattributes.ServiceName: aws.String("test-service"),
entityattributes.EntityType: aws.String(Service),
entityattributes.AwsAccountId: aws.String(accountId),
},
Attributes: map[string]*string{
InstanceIDKey: aws.String(instanceId),
Expand Down
61 changes: 55 additions & 6 deletions plugins/outputs/cloudwatchlogs/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@ import (

type mockLogSrc struct {
logs.LogSrc
returnEmpty bool
}

func (m *mockLogSrc) Entity() *cloudwatchlogs.Entity {
return &cloudwatchlogs.Entity{
entity := &cloudwatchlogs.Entity{
Attributes: map[string]*string{
"PlatformType": aws.String("AWS::EC2"),
"EC2.InstanceId": aws.String("i-123456789"),
"EC2.AutoScalingGroup": aws.String("test-group"),
},
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"AwsAccountId": aws.String("123456789"),
},
}
if m.returnEmpty {
return nil
}
return entity
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -103,7 +109,7 @@ func (e evtMock) Done() {
}
}

func TestAddSingleEvent(t *testing.T) {
func TestAddSingleEvent_WithAccountId(t *testing.T) {
var s svcMock
called := false
nst := "NEXT_SEQ_TOKEN"
Expand All @@ -114,8 +120,9 @@ func TestAddSingleEvent(t *testing.T) {
"EC2.AutoScalingGroup": aws.String("test-group"),
},
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"AwsAccountId": aws.String("123456789"),
},
}

Expand Down Expand Up @@ -155,6 +162,48 @@ func TestAddSingleEvent(t *testing.T) {
wg.Wait()
}

func TestAddSingleEvent_WithoutAccountId(t *testing.T) {
var s svcMock
called := false
nst := "NEXT_SEQ_TOKEN"

s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
called = true

if in.SequenceToken != nil {
t.Errorf("PutLogEvents called with wrong sequenceToken, first call should not provide any token")
}

if *in.LogGroupName != "G" || *in.LogStreamName != "S" {
t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName)
}

if len(in.LogEvents) != 1 || *in.LogEvents[0].Message != "MSG" {
t.Errorf("PutLogEvents called with incorrect message, got: '%v'", *in.LogEvents[0].Message)
}
require.Nil(t, in.Entity)
return &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: &nst,
}, nil
}

stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout)
p.logSrc = &mockLogSrc{returnEmpty: true}

p.AddEvent(evtMock{"MSG", time.Now(), nil})
require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.")

p.FlushTimeout = 10 * time.Millisecond
p.resetFlushTimer()

time.Sleep(3 * time.Second)
require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.")
require.NotNil(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken")

close(stop)
wg.Wait()
}

func TestStopPusherWouldDoFinalSend(t *testing.T) {
var s svcMock
called := false
Expand Down
30 changes: 30 additions & 0 deletions translator/tocwconfig/sampleConfig/statsd_ecs_config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[agent]
collection_jitter = "0s"
debug = false
flush_interval = "1s"
flush_jitter = "0s"
hostname = "fake-host-name"
interval = "60s"
logfile = ""
logtarget = "lumberjack"
metric_batch_size = 1000
metric_buffer_limit = 10000
omit_hostname = true
precision = ""
quiet = false
round_interval = false

[inputs]

[[inputs.statsd]]
allowed_pending_messages = 10000
interval = "10s"
parse_data_dog_tags = true
service_address = ":8125"
[inputs.statsd.tags]
"aws:StorageResolution" = "true"

[outputs]

[[outputs.cloudwatch]]

10 changes: 10 additions & 0 deletions translator/tocwconfig/sampleConfig/statsd_ecs_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"metrics": {
"metrics_collected": {
"statsd": {
"metrics_aggregation_interval": 0,
"allowed_pending_messages": 10000
}
}
}
}
50 changes: 50 additions & 0 deletions translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
exporters:
awscloudwatch:
force_flush_interval: 1m0s
max_datums_per_call: 1000
max_values_per_datum: 150
middleware: agenthealth/metrics
namespace: CWAgent
region: us-west-2
resource_to_telemetry_conversion:
enabled: true
extensions:
agenthealth/metrics:
is_usage_data_enabled: true
stats:
operations:
- PutMetricData
usage_flags:
mode: EC2
region_type: ACJ
receivers:
telegraf_statsd:
collection_interval: 10s
initial_delay: 1s
timeout: 0s
service:
extensions:
- agenthealth/metrics
pipelines:
metrics/hostCustomMetrics:
exporters:
- awscloudwatch
processors: []
receivers:
- telegraf_statsd
telemetry:
logs:
development: false
disable_caller: false
disable_stacktrace: false
encoding: console
level: info
sampling:
enabled: true
initial: 2
thereafter: 500
tick: 10s
metrics:
address: ""
level: None
traces: {}
29 changes: 29 additions & 0 deletions translator/tocwconfig/sampleConfig/statsd_eks_config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[agent]
collection_jitter = "0s"
debug = false
flush_interval = "1s"
flush_jitter = "0s"
hostname = "host_name_from_env"
interval = "60s"
logfile = ""
logtarget = "lumberjack"
metric_batch_size = 1000
metric_buffer_limit = 10000
omit_hostname = false
precision = ""
quiet = false
round_interval = false

[inputs]

[[inputs.statsd]]
allowed_pending_messages = 10000
interval = "10s"
parse_data_dog_tags = true
service_address = ":8125"
[inputs.statsd.tags]
"aws:StorageResolution" = "true"

[outputs]

[[outputs.cloudwatch]]
10 changes: 10 additions & 0 deletions translator/tocwconfig/sampleConfig/statsd_eks_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"metrics": {
"metrics_collected": {
"statsd": {
"metrics_aggregation_interval": 0,
"allowed_pending_messages": 10000
}
}
}
}
61 changes: 61 additions & 0 deletions translator/tocwconfig/sampleConfig/statsd_eks_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
exporters:
awscloudwatch:
force_flush_interval: 1m0s
max_datums_per_call: 1000
max_values_per_datum: 150
middleware: agenthealth/metrics
namespace: CWAgent
region: us-west-2
resource_to_telemetry_conversion:
enabled: true
extensions:
agenthealth/metrics:
is_usage_data_enabled: true
stats:
operations:
- PutMetricData
usage_flags:
mode: EKS
region_type: ACJ
entitystore:
mode: ec2
region: us-west-2
kubernetes_mode: EKS
server:
listen_addr: :4311
tls_ca_path: "/etc/amazon-cloudwatch-observability-agent-client-cert/tls-ca.crt"
tls_cert_path: "/etc/amazon-cloudwatch-observability-agent-server-cert/server.crt"
tls_key_path: "/etc/amazon-cloudwatch-observability-agent-server-cert/server.key"
receivers:
telegraf_statsd:
collection_interval: 10s
initial_delay: 1s
timeout: 0s
service:
extensions:
- agenthealth/metrics
- entitystore
- server
pipelines:
metrics/hostCustomMetrics:
exporters:
- awscloudwatch
processors: []
receivers:
- telegraf_statsd
telemetry:
logs:
development: false
disable_caller: false
disable_stacktrace: false
encoding: console
level: info
sampling:
enabled: true
initial: 2
thereafter: 500
tick: 10s
metrics:
address: ""
level: None
traces: {}
Loading

0 comments on commit a19df6f

Please sign in to comment.