Skip to content

Commit

Permalink
Adding agent health extensions to translators
Browse files Browse the repository at this point in the history
  • Loading branch information
Paramadon committed Dec 16, 2024
1 parent ad90d9e commit e37caa9
Show file tree
Hide file tree
Showing 86 changed files with 7,742 additions and 7,145 deletions.
17 changes: 16 additions & 1 deletion internal/ecsservicediscovery/servicediscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
Expand All @@ -23,6 +24,7 @@ type ServiceDiscovery struct {

stats ProcessorStats
clusterProcessors []Processor
Configurer *awsmiddleware.Configurer
}

func (sd *ServiceDiscovery) init() {
Expand All @@ -31,8 +33,21 @@ func (sd *ServiceDiscovery) init() {
}
configProvider := credentialConfig.Credentials()
sd.svcEcs = ecs.New(configProvider, aws.NewConfig().WithRegion(sd.Config.TargetClusterRegion).WithMaxRetries(AwsSdkLevelRetryCount))

if sd.Configurer != nil {
if err := sd.Configurer.Configure(awsmiddleware.SDKv1(&sd.svcEcs.Handlers)); err != nil {
log.Printf("ERROR: Failed to configure ECS client: %v", err)
}
}

sd.svcEc2 = ec2.New(configProvider, aws.NewConfig().WithRegion(sd.Config.TargetClusterRegion).WithMaxRetries(AwsSdkLevelRetryCount))

if sd.Configurer != nil {
if err := sd.Configurer.Configure(awsmiddleware.SDKv1(&sd.svcEc2.Handlers)); err != nil {
log.Printf("ERROR: Failed to configure EC2 client: %v", err)
}
}

sd.initClusterProcessorPipeline()
}

Expand Down Expand Up @@ -70,8 +85,8 @@ func StartECSServiceDiscovery(sd *ServiceDiscovery, shutDownChan chan interface{

func (sd *ServiceDiscovery) work() {
sd.stats.ResetStats()
var err error
var clusterTasks []*DecoratedTask
var err error
for _, p := range sd.clusterProcessors {
clusterTasks, err = p.Process(sd.Config.TargetCluster, clusterTasks)
// Ignore partial result to avoid overwriting existing targets
Expand Down
42 changes: 42 additions & 0 deletions internal/ecsservicediscovery/servicediscovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"sync"
"testing"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func Test_ServiceDiscovery_InitPipelines(t *testing.T) {
Expand Down Expand Up @@ -70,3 +72,43 @@ func Test_StartECSServiceDiscovery_BadClusterConfig(t *testing.T) {
StartECSServiceDiscovery(p, nil, &wg)
assert.Equal(t, 0, len(p.clusterProcessors))
}

func Test_StartECSServiceDiscovery_WithConfigurer(t *testing.T) {
var wg sync.WaitGroup
var requestHandlers []awsmiddleware.RequestHandler

handler := new(awsmiddleware.MockHandler)
handler.On("ID").Return("mock")
handler.On("Position").Return(awsmiddleware.After)
handler.On("HandleRequest", mock.Anything, mock.Anything)
handler.On("HandleResponse", mock.Anything, mock.Anything)
requestHandlers = append(requestHandlers, handler)
middleware := new(awsmiddleware.MockMiddlewareExtension)
middleware.On("Handlers").Return(
requestHandlers,
[]awsmiddleware.ResponseHandler{handler},
)
c := awsmiddleware.NewConfigurer(middleware.Handlers())

config := ServiceDiscoveryConfig{
TargetCluster: "test",
TargetClusterRegion: "us-east-1",
}
p := &ServiceDiscovery{Config: &config, Configurer: c}
wg.Add(1)
StartECSServiceDiscovery(p, nil, &wg)
assert.Equal(t, 0, len(p.clusterProcessors))
}

func Test_StartECSServiceDiscovery_WithoutConfigurer(t *testing.T) {
var wg sync.WaitGroup

config := ServiceDiscoveryConfig{
TargetCluster: "test",
TargetClusterRegion: "us-east-1",
}
p := &ServiceDiscovery{Config: &config, Configurer: nil}
wg.Add(1)
StartECSServiceDiscovery(p, nil, &wg)
assert.Equal(t, 0, len(p.clusterProcessors))
}
35 changes: 31 additions & 4 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (
_ "embed"
"sync"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/cfg/envconfig"
"github.com/aws/amazon-cloudwatch-agent/extension/agenthealth"
"github.com/aws/amazon-cloudwatch-agent/internal/ecsservicediscovery"
)

Expand All @@ -23,6 +27,7 @@ type Prometheus struct {
mbCh chan PrometheusMetricBatch
shutDownChan chan interface{}
wg sync.WaitGroup
middleware awsmiddleware.Middleware
}

func (p *Prometheus) SampleConfig() string {
Expand All @@ -39,19 +44,33 @@ func (p *Prometheus) Gather(_ telegraf.Accumulator) error {

func (p *Prometheus) Start(accIn telegraf.Accumulator) error {
mth := NewMetricsTypeHandler()

receiver := &metricsReceiver{pmbCh: p.mbCh}
handler := &metricsHandler{mbCh: p.mbCh,
handler := &metricsHandler{
mbCh: p.mbCh,
acc: accIn,
calculator: NewCalculator(),
filter: NewMetricsFilter(),
clusterName: p.ClusterName,
mtHandler: mth,
}

ecssd := &ecsservicediscovery.ServiceDiscovery{Config: p.ECSSDConfig}
var configurer *awsmiddleware.Configurer
var ecssd *ecsservicediscovery.ServiceDiscovery
needEcssd := true

if p.middleware != nil {
configurer = awsmiddleware.NewConfigurer(p.middleware.Handlers())
if configurer != nil {
ecssd = &ecsservicediscovery.ServiceDiscovery{Config: p.ECSSDConfig, Configurer: configurer}
needEcssd = false
}
}
if needEcssd {
ecssd = &ecsservicediscovery.ServiceDiscovery{Config: p.ECSSDConfig}
}

// Start ECS Service Discovery when in ECS
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ContainerInsights-Prometheus-Setup-autodiscovery-ecs.html
// Launch ECS Service Discovery as a goroutine
p.wg.Add(1)
go ecsservicediscovery.StartECSServiceDiscovery(ecssd, p.shutDownChan, &p.wg)

Expand All @@ -77,6 +96,14 @@ func init() {
return &Prometheus{
mbCh: make(chan PrometheusMetricBatch, 10000),
shutDownChan: make(chan interface{}),
middleware: agenthealth.NewAgentHealth(
zap.NewNop(),
&agenthealth.Config{
IsUsageDataEnabled: envconfig.IsUsageDataEnabled(),
IsStatusCodeEnabled: true,
},
),
}

})
}
5 changes: 3 additions & 2 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,9 @@ func init() {
middleware: agenthealth.NewAgentHealth(
zap.NewNop(),
&agenthealth.Config{
IsUsageDataEnabled: envconfig.IsUsageDataEnabled(),
Stats: &agent.StatsConfig{Operations: []string{"PutLogEvents"}},
IsUsageDataEnabled: envconfig.IsUsageDataEnabled(),
Stats: &agent.StatsConfig{Operations: []string{"PutLogEvents"}},
IsStatusCodeEnabled: true,
},
),
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/processors/ec2tagger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Config struct {
Filename string `mapstructure:"shared_credential_file,omitempty"`
Token string `mapstructure:"token,omitempty"`
IMDSRetries int `mapstructure:"imds_retries,omitempty"`

MiddlewareID *component.ID `mapstructure:"middleware,omitempty"`
}

// Verify Config implements Processor interface.
Expand Down
22 changes: 12 additions & 10 deletions plugins/processors/ec2tagger/ec2tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
Expand Down Expand Up @@ -56,14 +57,14 @@ type Tagger struct {
ec2API ec2iface.EC2API
volumeSerialCache volume.Cache

Configurer *awsmiddleware.Configurer
sync.RWMutex //to protect ec2TagCache
}

// newTagger returns a new EC2 Tagger processor.
func newTagger(config *Config, logger *zap.Logger) *Tagger {
_, cancel := context.WithCancel(context.Background())
mdCredentialConfig := &configaws.CredentialConfig{}

p := &Tagger{
Config: config,
logger: logger,
Expand Down Expand Up @@ -280,14 +281,12 @@ func (t *Tagger) ebsVolumesRetrieved() bool {

// Start acts as input validation and serves the purpose of updating ec2 tags and ebs volumes if necessary.
// It will be called when OTel is enabling each processor
func (t *Tagger) Start(ctx context.Context, _ component.Host) error {
func (t *Tagger) Start(ctx context.Context, host component.Host) error {
t.shutdownC = make(chan bool)
t.ec2TagCache = map[string]string{}

if err := t.deriveEC2MetadataFromIMDS(ctx); err != nil {
return err
}

t.tagFilters = []*ec2.Filter{
{
Name: aws.String("resource-type"),
Expand All @@ -298,12 +297,10 @@ func (t *Tagger) Start(ctx context.Context, _ component.Host) error {
Values: aws.StringSlice([]string{t.ec2MetadataRespond.instanceId}),
},
}

// if the customer said 'AutoScalingGroupName' (the CW dimension), do what they mean not what they said
// and filter for the EC2 tag name called 'aws:autoscaling:groupName'
useAllTags := len(t.EC2InstanceTagKeys) == 1 && t.EC2InstanceTagKeys[0] == "*"

if !useAllTags && len(t.EC2InstanceTagKeys) > 0 {
// if the customer said 'AutoScalingGroupName' (the CW dimension), do what they mean not what they said
// and filter for the EC2 tag name called 'aws:autoscaling:groupName'
for i, key := range t.EC2InstanceTagKeys {
if cwDimensionASG == key {
t.EC2InstanceTagKeys[i] = Ec2InstanceTagKeyASG
Expand All @@ -315,7 +312,6 @@ func (t *Tagger) Start(ctx context.Context, _ component.Host) error {
Values: aws.StringSlice(t.EC2InstanceTagKeys),
})
}

if len(t.EC2InstanceTagKeys) > 0 || len(t.EBSDeviceKeys) > 0 {
ec2CredentialConfig := &configaws.CredentialConfig{
AccessKey: t.AccessKey,
Expand All @@ -327,6 +323,13 @@ func (t *Tagger) Start(ctx context.Context, _ component.Host) error {
Region: t.ec2MetadataRespond.region,
}
t.ec2API = t.ec2Provider(ec2CredentialConfig)

if client, ok := t.ec2API.(*ec2.EC2); ok {
if t.Config.MiddlewareID != nil {
awsmiddleware.TryConfigure(t.logger, host, *t.Config.MiddlewareID, awsmiddleware.SDKv1(&client.Handlers))
}
}

go func() { //Async start of initial retrieval to prevent block of agent start
t.initialRetrievalOfTagsAndVolumes()
t.refreshLoopToUpdateTagsAndVolumes()
Expand All @@ -336,7 +339,6 @@ func (t *Tagger) Start(ctx context.Context, _ component.Host) error {
} else {
t.setStarted()
}

return nil
}

Expand Down
12 changes: 10 additions & 2 deletions translator/tocwconfig/sampleConfig/advanced_config_darwin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ extensions:
usage_flags:
mode: EC2
region_type: ACJ
agenthealth/statuscode:
is_status_code_enabled: true
is_usage_data_enabled: true
stats:
usage_flags:
mode: EC2
region_type: ACJ
entitystore:
mode: ec2
region: us-west-2
Expand All @@ -38,9 +45,9 @@ processors:
ec2_instance_tag_keys:
- AutoScalingGroupName
ec2_metadata_tags:
- InstanceType
- ImageId
- InstanceId
- InstanceType
imds_retries: 1
refresh_interval_seconds: 0s
receivers:
Expand Down Expand Up @@ -71,6 +78,7 @@ receivers:
service:
extensions:
- agenthealth/metrics
- agenthealth/statuscode
- entitystore
pipelines:
metrics/host:
Expand All @@ -81,10 +89,10 @@ service:
- ec2tagger
receivers:
- telegraf_disk
- telegraf_mem
- telegraf_netstat
- telegraf_swap
- telegraf_cpu
- telegraf_mem
metrics/hostDeltaMetrics:
exporters:
- awscloudwatch
Expand Down
18 changes: 13 additions & 5 deletions translator/tocwconfig/sampleConfig/advanced_config_linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ extensions:
usage_flags:
mode: EC2
region_type: ACJ
agenthealth/statuscode:
is_status_code_enabled: true
is_usage_data_enabled: true
stats:
usage_flags:
mode: EC2
region_type: ACJ
entitystore:
mode: ec2
region: us-west-2
processors:
awsentity/resource:
entity_type: Resource
platform: ec2
platform: ec2
cumulativetodelta/hostDeltaMetrics:
exclude:
match_type: strict
Expand All @@ -38,9 +45,9 @@ processors:
ec2_instance_tag_keys:
- AutoScalingGroupName
ec2_metadata_tags:
- InstanceType
- ImageId
- InstanceId
- InstanceType
imds_retries: 1
refresh_interval_seconds: 0s
receivers:
Expand Down Expand Up @@ -79,6 +86,7 @@ receivers:
service:
extensions:
- agenthealth/metrics
- agenthealth/statuscode
- entitystore
pipelines:
metrics/host:
Expand All @@ -88,13 +96,13 @@ service:
- awsentity/resource
- ec2tagger
receivers:
- telegraf_swap
- telegraf_ethtool
- telegraf_nvidia_smi
- telegraf_cpu
- telegraf_disk
- telegraf_mem
- telegraf_netstat
- telegraf_swap
- telegraf_ethtool
- telegraf_nvidia_smi
metrics/hostDeltaMetrics:
exporters:
- awscloudwatch
Expand Down
Loading

0 comments on commit e37caa9

Please sign in to comment.