Skip to content

Commit

Permalink
Annotate Infra, statsD, collectD metrics and Fluent-bit application l…
Browse files Browse the repository at this point in the history
…ogs with entities
  • Loading branch information
jefchien authored Oct 30, 2024
2 parents 1d3baa2 + 5c6298d commit ba0d929
Show file tree
Hide file tree
Showing 188 changed files with 46,911 additions and 2,418 deletions.
4 changes: 4 additions & 0 deletions extension/agenthealth/handler/stats/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Stats struct {
RunningInContainer *int `json:"ric,omitempty"`
RegionType *string `json:"rt,omitempty"`
Mode *string `json:"m,omitempty"`
EntityRejected *int `json:"ent,omitempty"`
}

// Merge the other Stats into the current. If the field is not nil,
Expand Down Expand Up @@ -76,6 +77,9 @@ func (s *Stats) Merge(other Stats) {
if other.Mode != nil {
s.Mode = other.Mode
}
if other.EntityRejected != nil {
s.EntityRejected = other.EntityRejected
}
}

func (s *Stats) Marshal() (string, error) {
Expand Down
27 changes: 27 additions & 0 deletions extension/agenthealth/handler/stats/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package client

import (
"bytes"
"context"
"io"
"net/http"
Expand All @@ -23,6 +24,10 @@ const (
cacheSize = 1000
)

var (
rejectedEntityInfo = []byte("\"rejectedEntityInfo\"")
)

type Stats interface {
awsmiddleware.RequestHandler
awsmiddleware.ResponseHandler
Expand Down Expand Up @@ -108,6 +113,9 @@ func (csh *clientStatsHandler) HandleResponse(ctx context.Context, r *http.Respo
}
latency := time.Since(recorder.start)
stats.LatencyMillis = aws.Int64(latency.Milliseconds())
if rejectedEntityInfoExists(r) {
stats.EntityRejected = aws.Int(1)
}
csh.statsByOperation.Store(operation, stats)
}

Expand All @@ -122,3 +130,22 @@ func (csh *clientStatsHandler) Stats(operation string) agent.Stats {
}
return stats
}

// rejectedEntityInfoExists checks if the response body
// contains element rejectedEntityInfo
func rejectedEntityInfoExists(r *http.Response) bool {
// Example body for rejectedEntityInfo would be:
// {"rejectedEntityInfo":{"errorType":"InvalidAttributes"}}
if r == nil || r.Body == nil {
return false
}
bodyBytes, err := io.ReadAll(r.Body)
r.Body.Close()
// Reset the response body stream since it can only be read once. Not doing this results in duplicate requests.
// See https://stackoverflow.com/questions/33532374/in-go-how-can-i-reuse-a-readcloser
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
if err != nil {
return false
}
return bytes.Contains(bodyBytes, rejectedEntityInfo)
}
21 changes: 20 additions & 1 deletion extension/agenthealth/handler/stats/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"bytes"
"context"
"io"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -37,11 +38,16 @@ func TestHandle(t *testing.T) {
assert.Nil(t, got.PayloadBytes)
assert.Nil(t, got.StatusCode)
time.Sleep(time.Millisecond)
handler.HandleResponse(ctx, &http.Response{StatusCode: http.StatusOK})
handler.HandleResponse(ctx, &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString(`{"rejectedEntityInfo":{"errorType":"InvalidAttributes"}}`)),
})
got = handler.Stats(operation)
assert.NotNil(t, got.LatencyMillis)
assert.NotNil(t, got.PayloadBytes)
assert.NotNil(t, got.StatusCode)
assert.NotNil(t, got.EntityRejected)
assert.Equal(t, 1, *got.EntityRejected)
assert.Equal(t, http.StatusOK, *got.StatusCode)
assert.Equal(t, 20, *got.PayloadBytes)
assert.GreaterOrEqual(t, *got.LatencyMillis, int64(1))
Expand All @@ -65,3 +71,16 @@ func TestHandle(t *testing.T) {
assert.NotNil(t, got.PayloadBytes)
assert.Equal(t, 29, *got.PayloadBytes)
}

func BenchmarkRejectedEntityInfoExists(b *testing.B) {
body := `{"rejectedEntityInfo":{"errorType":"InvalidAttributes"}}`
resp := &http.Response{
Body: io.NopCloser(bytes.NewBufferString(body)),
}

for n := 0; n < b.N; n++ {
rejectedEntityInfoExists(resp)
// Reset the body for the next iteration
resp.Body = io.NopCloser(bytes.NewBufferString(body))
}
}
19 changes: 19 additions & 0 deletions extension/entitystore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package entitystore

import (
"go.opentelemetry.io/collector/component"
)

type Config struct {
Mode string `mapstructure:"mode"`
KubernetesMode string `mapstructure:"kubernetes_mode,omitempty"`
Region string `mapstructure:"region"`
Profile string `mapstructure:"profile,omitempty"`
RoleARN string `mapstructure:"role_arn,omitempty"`
Filename string `mapstructure:"shared_credential_file,omitempty"`
}

var _ component.Config = (*Config)(nil)
18 changes: 18 additions & 0 deletions extension/entitystore/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package entitystore

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/confmap"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, confmap.New().Unmarshal(cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}
163 changes: 163 additions & 0 deletions extension/entitystore/ec2Info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package entitystore

import (
"context"
"errors"
"strings"
"sync"
"time"

"go.uber.org/zap"

"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger"
)

const (
// InstanceId character maximum length is 19.
// See https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_Instance.html.
instanceIdSizeMax = 19

// AutoScalingGroup character maximum length is 255.
// See https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_AutoScalingGroup.html.
autoScalingGroupSizeMax = 255
)

type EC2Info struct {
InstanceID string
AccountID string
AutoScalingGroup string

// region is used while making call to describeTags Ec2 API for AutoScalingGroup
Region string

metadataProvider ec2metadataprovider.MetadataProvider
logger *zap.Logger
done chan struct{}
mutex sync.RWMutex
}

func (ei *EC2Info) initEc2Info() {
ei.logger.Debug("Initializing EC2Info")
if err := ei.setInstanceIDAccountID(); err != nil {
return
}
if err := ei.setAutoScalingGroup(); err != nil {
return
}
ei.logger.Debug("Finished initializing EC2Info")
}

func (ei *EC2Info) GetInstanceID() string {
ei.mutex.RLock()
defer ei.mutex.RUnlock()
return ei.InstanceID
}

func (ei *EC2Info) GetAccountID() string {
ei.mutex.RLock()
defer ei.mutex.RUnlock()
return ei.AccountID
}

func (ei *EC2Info) GetAutoScalingGroup() string {
ei.mutex.RLock()
defer ei.mutex.RUnlock()
return ei.AutoScalingGroup
}

func (ei *EC2Info) setInstanceIDAccountID() error {
for {
metadataDoc, err := ei.metadataProvider.Get(context.Background())
if err != nil {
ei.logger.Warn("Failed to get Instance ID / Account ID through metadata provider", zap.Error(err))
wait := time.NewTimer(1 * time.Minute)
select {
case <-ei.done:
wait.Stop()
return errors.New("shutdown signal received")
case <-wait.C:
continue
}
}
ei.logger.Debug("Successfully retrieved Instance ID and Account ID")
ei.mutex.Lock()
ei.InstanceID = metadataDoc.InstanceID
if idLength := len(ei.InstanceID); idLength > instanceIdSizeMax {
ei.logger.Warn("InstanceId length exceeds characters limit and will be ignored", zap.Int("length", idLength), zap.Int("character limit", instanceIdSizeMax))
ei.InstanceID = ""
}
ei.AccountID = metadataDoc.AccountID
ei.mutex.Unlock()
return nil
}
}

func (ei *EC2Info) setAutoScalingGroup() error {
retry := 0
for {
var waitDuration time.Duration
if retry < len(ec2tagger.BackoffSleepArray) {
waitDuration = ec2tagger.BackoffSleepArray[retry]
} else {
waitDuration = ec2tagger.BackoffSleepArray[len(ec2tagger.BackoffSleepArray)-1]
}

wait := time.NewTimer(waitDuration)
select {
case <-ei.done:
wait.Stop()
return errors.New("shutdown signal received")
case <-wait.C:
}

if retry > 0 {
ei.logger.Debug("Initial retrieval of tags and volumes", zap.Int("retry", retry))
}

if err := ei.retrieveAsgName(); err != nil {
ei.logger.Warn("Unable to fetch instance tags with imds", zap.Int("retry", retry), zap.Error(err))
} else {
ei.logger.Debug("Retrieval of auto-scaling group tags succeeded")
return nil
}

retry++
}

}

func (ei *EC2Info) retrieveAsgName() error {
tags, err := ei.metadataProvider.InstanceTags(context.Background())
if err != nil {
ei.logger.Debug("Failed to get tags through metadata provider", zap.Error(err))
return err
} else if strings.Contains(tags, ec2tagger.Ec2InstanceTagKeyASG) {
asg, err := ei.metadataProvider.InstanceTagValue(context.Background(), ec2tagger.Ec2InstanceTagKeyASG)
if err != nil {
ei.logger.Error("Failed to get AutoScalingGroup through metadata provider", zap.Error(err))
} else {
ei.logger.Debug("AutoScalingGroup retrieved through IMDS")
ei.mutex.Lock()
ei.AutoScalingGroup = asg
if asgLength := len(ei.AutoScalingGroup); asgLength > autoScalingGroupSizeMax {
ei.logger.Warn("AutoScalingGroup length exceeds characters limit and will be ignored", zap.Int("length", asgLength), zap.Int("character limit", autoScalingGroupSizeMax))
ei.AutoScalingGroup = ""
}
ei.mutex.Unlock()
}
}
return nil
}

func newEC2Info(metadataProvider ec2metadataprovider.MetadataProvider, done chan struct{}, region string, logger *zap.Logger) *EC2Info {
return &EC2Info{
metadataProvider: metadataProvider,
done: done,
Region: region,
logger: logger,
}
}
Loading

0 comments on commit ba0d929

Please sign in to comment.