From a19df6ffac4b8fda6dfd207df21aae9d40e0f460 Mon Sep 17 00:00:00 2001 From: POOJA REDDY NATHALA Date: Wed, 6 Nov 2024 15:07:42 -0500 Subject: [PATCH] Revert "Do not send AccountID as KeyAttribute for entities in PutLogEvents requests" and add unit tests (#1411) Co-authored-by: zhihonl <61301537+zhihonl@users.noreply.github.com> --- extension/entitystore/ec2Info.go | 4 +- extension/entitystore/extension.go | 5 +- extension/entitystore/extension_test.go | 4 +- plugins/outputs/cloudwatchlogs/pusher_test.go | 61 +++++++++++++++++-- .../sampleConfig/statsd_ecs_config.conf | 30 +++++++++ .../sampleConfig/statsd_ecs_config.json | 10 +++ .../sampleConfig/statsd_ecs_config.yaml | 50 +++++++++++++++ .../sampleConfig/statsd_eks_config.conf | 29 +++++++++ .../sampleConfig/statsd_eks_config.json | 10 +++ .../sampleConfig/statsd_eks_config.yaml | 61 +++++++++++++++++++ translator/tocwconfig/tocwconfig_test.go | 31 ++++++++++ 11 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 translator/tocwconfig/sampleConfig/statsd_ecs_config.conf create mode 100644 translator/tocwconfig/sampleConfig/statsd_ecs_config.json create mode 100644 translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml create mode 100644 translator/tocwconfig/sampleConfig/statsd_eks_config.conf create mode 100644 translator/tocwconfig/sampleConfig/statsd_eks_config.json create mode 100644 translator/tocwconfig/sampleConfig/statsd_eks_config.yaml diff --git a/extension/entitystore/ec2Info.go b/extension/entitystore/ec2Info.go index ae641ddbfd..cfb2eccba8 100644 --- a/extension/entitystore/ec2Info.go +++ b/extension/entitystore/ec2Info.go @@ -76,7 +76,7 @@ func (ei *EC2Info) setInstanceIDAccountID() error { for { metadataDoc, err := ei.metadataProvider.Get(context.Background()) if err != nil { - ei.logger.Warn("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err)) + ei.logger.Debug("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err)) wait := time.NewTimer(1 * time.Minute) select { case <-ei.done: @@ -122,7 +122,7 @@ func (ei *EC2Info) setAutoScalingGroup() error { } if err := ei.retrieveAsgName(); err != nil { - ei.logger.Warn("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err)) + ei.logger.Debug("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err)) } else { ei.logger.Debug("Retrieval of auto-scaling group tags succeeded") return nil diff --git a/extension/entitystore/extension.go b/extension/entitystore/extension.go index 737223d7e5..a6af693cb3 100644 --- a/extension/entitystore/extension.go +++ b/extension/entitystore/extension.go @@ -150,7 +150,9 @@ func (e *EntityStore) CreateLogFileEntity(logFileGlob LogFileGlob, logGroupName keyAttributes := e.createServiceKeyAttributes(serviceAttr) attributeMap := e.createAttributeMap() addNonEmptyToMap(attributeMap, ServiceNameSourceKey, serviceAttr.ServiceNameSource) - + if _, ok := keyAttributes[entityattributes.AwsAccountId]; !ok { + return nil + } return &cloudwatchlogs.Entity{ KeyAttributes: keyAttributes, Attributes: attributeMap, @@ -235,6 +237,7 @@ func (e *EntityStore) createServiceKeyAttributes(serviceAttr ServiceAttribute) m } addNonEmptyToMap(serviceKeyAttr, entityattributes.ServiceName, serviceAttr.ServiceName) addNonEmptyToMap(serviceKeyAttr, entityattributes.DeploymentEnvironment, serviceAttr.Environment) + addNonEmptyToMap(serviceKeyAttr, entityattributes.AwsAccountId, e.ec2Info.GetAccountID()) return serviceKeyAttr } diff --git a/extension/entitystore/extension_test.go b/extension/entitystore/extension_test.go index d28cfdcdfc..988df7dd54 100644 --- a/extension/entitystore/extension_test.go +++ b/extension/entitystore/extension_test.go @@ -310,6 +310,7 @@ func TestEntityStore_createServiceKeyAttributes(t *testing.T) { func TestEntityStore_createLogFileRID(t *testing.T) { instanceId := "i-abcd1234" + accountId := "123456789012" glob := LogFileGlob("glob") group := LogGroupName("group") serviceAttr := ServiceAttribute{ @@ -321,7 +322,7 @@ func TestEntityStore_createLogFileRID(t *testing.T) { sp.On("logFileServiceAttribute", glob, group).Return(serviceAttr) e := EntityStore{ mode: config.ModeEC2, - ec2Info: EC2Info{InstanceID: instanceId}, + ec2Info: EC2Info{InstanceID: instanceId, AccountID: accountId}, serviceprovider: sp, nativeCredential: &session.Session{}, } @@ -333,6 +334,7 @@ func TestEntityStore_createLogFileRID(t *testing.T) { entityattributes.DeploymentEnvironment: aws.String("test-environment"), entityattributes.ServiceName: aws.String("test-service"), entityattributes.EntityType: aws.String(Service), + entityattributes.AwsAccountId: aws.String(accountId), }, Attributes: map[string]*string{ InstanceIDKey: aws.String(instanceId), diff --git a/plugins/outputs/cloudwatchlogs/pusher_test.go b/plugins/outputs/cloudwatchlogs/pusher_test.go index f97b9f22ec..fc3957e5cc 100644 --- a/plugins/outputs/cloudwatchlogs/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/pusher_test.go @@ -27,20 +27,26 @@ import ( type mockLogSrc struct { logs.LogSrc + returnEmpty bool } func (m *mockLogSrc) Entity() *cloudwatchlogs.Entity { - return &cloudwatchlogs.Entity{ + entity := &cloudwatchlogs.Entity{ Attributes: map[string]*string{ "PlatformType": aws.String("AWS::EC2"), "EC2.InstanceId": aws.String("i-123456789"), "EC2.AutoScalingGroup": aws.String("test-group"), }, KeyAttributes: map[string]*string{ - "Name": aws.String("myService"), - "Environment": aws.String("myEnvironment"), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + "AwsAccountId": aws.String("123456789"), }, } + if m.returnEmpty { + return nil + } + return entity } var wg sync.WaitGroup @@ -103,7 +109,7 @@ func (e evtMock) Done() { } } -func TestAddSingleEvent(t *testing.T) { +func TestAddSingleEvent_WithAccountId(t *testing.T) { var s svcMock called := false nst := "NEXT_SEQ_TOKEN" @@ -114,8 +120,9 @@ func TestAddSingleEvent(t *testing.T) { "EC2.AutoScalingGroup": aws.String("test-group"), }, KeyAttributes: map[string]*string{ - "Name": aws.String("myService"), - "Environment": aws.String("myEnvironment"), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + "AwsAccountId": aws.String("123456789"), }, } @@ -155,6 +162,48 @@ func TestAddSingleEvent(t *testing.T) { wg.Wait() } +func TestAddSingleEvent_WithoutAccountId(t *testing.T) { + var s svcMock + called := false + nst := "NEXT_SEQ_TOKEN" + + s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + called = true + + if in.SequenceToken != nil { + t.Errorf("PutLogEvents called with wrong sequenceToken, first call should not provide any token") + } + + if *in.LogGroupName != "G" || *in.LogStreamName != "S" { + t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName) + } + + if len(in.LogEvents) != 1 || *in.LogEvents[0].Message != "MSG" { + t.Errorf("PutLogEvents called with incorrect message, got: '%v'", *in.LogEvents[0].Message) + } + require.Nil(t, in.Entity) + return &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: &nst, + }, nil + } + + stop, p := testPreparation(-1, &s, 1*time.Hour, maxRetryTimeout) + p.logSrc = &mockLogSrc{returnEmpty: true} + + p.AddEvent(evtMock{"MSG", time.Now(), nil}) + require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") + + p.FlushTimeout = 10 * time.Millisecond + p.resetFlushTimer() + + time.Sleep(3 * time.Second) + require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.") + require.NotNil(t, nst, *p.sequenceToken, "Pusher did not capture the NextSequenceToken") + + close(stop) + wg.Wait() +} + func TestStopPusherWouldDoFinalSend(t *testing.T) { var s svcMock called := false diff --git a/translator/tocwconfig/sampleConfig/statsd_ecs_config.conf b/translator/tocwconfig/sampleConfig/statsd_ecs_config.conf new file mode 100644 index 0000000000..25d549c964 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/statsd_ecs_config.conf @@ -0,0 +1,30 @@ +[agent] + collection_jitter = "0s" + debug = false + flush_interval = "1s" + flush_jitter = "0s" + hostname = "fake-host-name" + interval = "60s" + logfile = "" + logtarget = "lumberjack" + metric_batch_size = 1000 + metric_buffer_limit = 10000 + omit_hostname = true + precision = "" + quiet = false + round_interval = false + +[inputs] + + [[inputs.statsd]] + allowed_pending_messages = 10000 + interval = "10s" + parse_data_dog_tags = true + service_address = ":8125" + [inputs.statsd.tags] + "aws:StorageResolution" = "true" + +[outputs] + + [[outputs.cloudwatch]] + diff --git a/translator/tocwconfig/sampleConfig/statsd_ecs_config.json b/translator/tocwconfig/sampleConfig/statsd_ecs_config.json new file mode 100644 index 0000000000..0f2908f98b --- /dev/null +++ b/translator/tocwconfig/sampleConfig/statsd_ecs_config.json @@ -0,0 +1,10 @@ +{ + "metrics": { + "metrics_collected": { + "statsd": { + "metrics_aggregation_interval": 0, + "allowed_pending_messages": 10000 + } + } + } +} \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml b/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml new file mode 100644 index 0000000000..0e0e2da5a1 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/statsd_ecs_config.yaml @@ -0,0 +1,50 @@ +exporters: + awscloudwatch: + force_flush_interval: 1m0s + max_datums_per_call: 1000 + max_values_per_datum: 150 + middleware: agenthealth/metrics + namespace: CWAgent + region: us-west-2 + resource_to_telemetry_conversion: + enabled: true +extensions: + agenthealth/metrics: + is_usage_data_enabled: true + stats: + operations: + - PutMetricData + usage_flags: + mode: EC2 + region_type: ACJ +receivers: + telegraf_statsd: + collection_interval: 10s + initial_delay: 1s + timeout: 0s +service: + extensions: + - agenthealth/metrics + pipelines: + metrics/hostCustomMetrics: + exporters: + - awscloudwatch + processors: [] + receivers: + - telegraf_statsd + telemetry: + logs: + development: false + disable_caller: false + disable_stacktrace: false + encoding: console + level: info + sampling: + enabled: true + initial: 2 + thereafter: 500 + tick: 10s + metrics: + address: "" + level: None + traces: {} diff --git a/translator/tocwconfig/sampleConfig/statsd_eks_config.conf b/translator/tocwconfig/sampleConfig/statsd_eks_config.conf new file mode 100644 index 0000000000..c1deb83dcb --- /dev/null +++ b/translator/tocwconfig/sampleConfig/statsd_eks_config.conf @@ -0,0 +1,29 @@ +[agent] + collection_jitter = "0s" + debug = false + flush_interval = "1s" + flush_jitter = "0s" + hostname = "host_name_from_env" + interval = "60s" + logfile = "" + logtarget = "lumberjack" + metric_batch_size = 1000 + metric_buffer_limit = 10000 + omit_hostname = false + precision = "" + quiet = false + round_interval = false + +[inputs] + + [[inputs.statsd]] + allowed_pending_messages = 10000 + interval = "10s" + parse_data_dog_tags = true + service_address = ":8125" + [inputs.statsd.tags] + "aws:StorageResolution" = "true" + +[outputs] + + [[outputs.cloudwatch]] \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/statsd_eks_config.json b/translator/tocwconfig/sampleConfig/statsd_eks_config.json new file mode 100644 index 0000000000..0f2908f98b --- /dev/null +++ b/translator/tocwconfig/sampleConfig/statsd_eks_config.json @@ -0,0 +1,10 @@ +{ + "metrics": { + "metrics_collected": { + "statsd": { + "metrics_aggregation_interval": 0, + "allowed_pending_messages": 10000 + } + } + } +} \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml b/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml new file mode 100644 index 0000000000..2ed69a21d4 --- /dev/null +++ b/translator/tocwconfig/sampleConfig/statsd_eks_config.yaml @@ -0,0 +1,61 @@ +exporters: + awscloudwatch: + force_flush_interval: 1m0s + max_datums_per_call: 1000 + max_values_per_datum: 150 + middleware: agenthealth/metrics + namespace: CWAgent + region: us-west-2 + resource_to_telemetry_conversion: + enabled: true +extensions: + agenthealth/metrics: + is_usage_data_enabled: true + stats: + operations: + - PutMetricData + usage_flags: + mode: EKS + region_type: ACJ + entitystore: + mode: ec2 + region: us-west-2 + kubernetes_mode: EKS + server: + listen_addr: :4311 + tls_ca_path: "/etc/amazon-cloudwatch-observability-agent-client-cert/tls-ca.crt" + tls_cert_path: "/etc/amazon-cloudwatch-observability-agent-server-cert/server.crt" + tls_key_path: "/etc/amazon-cloudwatch-observability-agent-server-cert/server.key" +receivers: + telegraf_statsd: + collection_interval: 10s + initial_delay: 1s + timeout: 0s +service: + extensions: + - agenthealth/metrics + - entitystore + - server + pipelines: + metrics/hostCustomMetrics: + exporters: + - awscloudwatch + processors: [] + receivers: + - telegraf_statsd + telemetry: + logs: + development: false + disable_caller: false + disable_stacktrace: false + encoding: console + level: info + sampling: + enabled: true + initial: 2 + thereafter: 500 + tick: 10s + metrics: + address: "" + level: None + traces: {} diff --git a/translator/tocwconfig/tocwconfig_test.go b/translator/tocwconfig/tocwconfig_test.go index 60b0d2ad9c..c607257a3a 100644 --- a/translator/tocwconfig/tocwconfig_test.go +++ b/translator/tocwconfig/tocwconfig_test.go @@ -136,6 +136,21 @@ func TestAppSignalsFallbackAndEKSConfig(t *testing.T) { checkTranslation(t, "appsignals_fallback_and_eks_config", "windows", expectedEnvVars, "") } +func TestStatsDAndEKSConfig(t *testing.T) { + resetContext(t) + context.CurrentContext().SetRunInContainer(true) + t.Setenv(config.HOST_NAME, "host_name_from_env") + t.Setenv(config.HOST_IP, "127.0.0.1") + t.Setenv(common.KubernetesEnvVar, "use_statsd_eks_config") + eksdetector.NewDetector = eksdetector.TestEKSDetector + context.CurrentContext().SetMode(config.ModeEC2) + context.CurrentContext().SetKubernetesMode(config.ModeEKS) + + expectedEnvVars := map[string]string{} + checkTranslation(t, "statsd_eks_config", "linux", expectedEnvVars, "") + checkTranslation(t, "statsd_eks_config", "windows", expectedEnvVars, "") +} + func TestAppSignalsAndECSConfig(t *testing.T) { resetContext(t) context.CurrentContext().SetRunInContainer(true) @@ -621,6 +636,22 @@ func TestECSNodeMetricConfig(t *testing.T) { ecsSingleton.Region = "" } +func TestECSNodeStatsDConfig(t *testing.T) { + resetContext(t) + context.CurrentContext().SetRunInContainer(true) + context.CurrentContext().SetMode(config.ModeEC2) + ecsSingleton := ecsutil.GetECSUtilSingleton() + ecsSingleton.Region = "us-west-2" + t.Setenv("RUN_IN_CONTAINER", "True") + t.Setenv("HOST_NAME", "fake-host-name") + t.Setenv("HOST_IP", "127.0.0.1") + expectedEnvVars := map[string]string{} + checkTranslation(t, "statsd_ecs_config", "linux", expectedEnvVars, "") + checkTranslation(t, "statsd_ecs_config", "darwin", nil, "") + //Reset back to default value to not impact other tests + ecsSingleton.Region = "" +} + func TestLogFilterConfig(t *testing.T) { resetContext(t) checkTranslation(t, "log_filter", "linux", nil, "")