Skip to content

Commit

Permalink
Enable awscloudwatch input to collect logs from linked source accounts.
Browse files Browse the repository at this point in the history
For cross account monitoring, we need to use the log group ARN, instead
of log group name in order to retreive logs from outside the monitoring
account. This change used ARN throughout (but still requires a
conversion back to get the 'plain' log group name to construct the log metadata
at the end - not ideal).
  • Loading branch information
tommyers-elastic committed Sep 21, 2023
1 parent edc9851 commit 844df38
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 33 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ require (
github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77 // indirect
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/aws/aws-lambda-go v1.13.3
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2 v1.21.0
github.com/aws/aws-sdk-go-v2/config v1.17.7
github.com/aws/aws-sdk-go-v2/credentials v1.12.20
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0
github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4
github.com/aws/aws-sdk-go-v2/service/ec2 v1.36.1
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.18.4
Expand Down Expand Up @@ -198,7 +198,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.33
github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4
github.com/aws/aws-sdk-go-v2/service/kinesis v1.15.8
github.com/aws/smithy-go v1.13.5
github.com/aws/smithy-go v1.14.2
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-autodiscover v0.6.2
Expand Down Expand Up @@ -249,8 +249,8 @@ require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.38.60 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/aws/aws-sdk-go-v2 v1.16.6/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4
github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k=
github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc=
github.com/aws/aws-sdk-go-v2 v1.21.0/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3/go.mod h1:gNsR5CaXKmQSSzrmGxmwmct/r+ZBfbxorAuXYsj/M5Y=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 h1:tcFliCWne+zOuUfKNRn8JdFBuWPDuISDH08wD2ULkhk=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8/go.mod h1:JTnlBSot91steJeti4ryyu/tLd4Sk84O5W22L7O2EQU=
Expand All @@ -303,11 +305,15 @@ github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.13/go.mod h1:wLLesU+LdM
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23/go.mod h1:2DFxAQ9pfIRy0imBCJv+vZ2X6RKxves6fbnEuSry6b4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 h1:22dGT7PneFMx4+b3pz7lMTRyN8ZKH7M2cW4GP9yUS2g=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41/go.mod h1:CrObHAuPneJBlfEJ5T3szXOUkLEThaGfvnhTf33buas=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4/go.mod h1:8glyUqVIM4AmeenIsPo0oVh3+NUwnsQml2OFupfQW+0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.7/go.mod h1:93Uot80ddyVzSl//xEJreNKMhxntr71WtR3v/A1cRYk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17/go.mod h1:pRwaTYCJemADaqCbUAxltMoHKata7hmB5PjEXeu0kfg=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 h1:SijA0mgjV8E+8G45ltVHs0fvKpTj8xmZJ3VwhGKtUSI=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35/go.mod h1:SJC1nEVVva1g3pHAIdCp7QsRIkMmLAgoDquQ9Rr8kYw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 h1:wj5Rwc05hvUSvKuOF29IYb9QrCLjU+rHAy/x/o0DK2c=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24/go.mod h1:jULHjqqjDlbyTa7pfM7WICATnOv+iOhjletM3N0Xbu8=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 h1:ZSIPAkAsCCjYrhqfw2+lNzWDzxzHXEckFkTePL5RSWQ=
Expand All @@ -318,6 +324,8 @@ github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0 h1:sSzrsKQULJmPtmu6By4wR
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0/go.mod h1:t5mizLPjCYafXoHCXOHJU7z4OvLbY70Echvb1ciBTV4=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5 h1:aPK8IBVKeozo/pNGshT8xOJ2V3Y7ykOM49QcY0vhUSM=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5/go.mod h1:ErjxucZaraVbYm66xxub00qmGBw7md2RFqy6624KbR8=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0 h1:6LRil7J+uh2SZ58Wkm/5aVRpBOZbTtwi8p8gdsix94c=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0/go.mod h1:5v2ZNXCSwG73rx0k3sCuB1Ju8sbEbG0iUlxCA7D8sV8=
github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4 h1:jbfG3cbq1kiK1/OAfUh4zf1ADtAU8KoeOPfF94S96pU=
github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4/go.mod h1:yC5cDNa3xzSh5NIU5x0NBBo6QkcsaM0tuPNCczeUPoU=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.36.1 h1:FS8Ja6LuLDVHcX+rmoNpOXqYb52N2A5DwQy7Dgduq4Q=
Expand Down Expand Up @@ -360,6 +368,8 @@ github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J
github.com/aws/smithy-go v1.13.3/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ=
github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/awslabs/goformation/v3 v3.1.0/go.mod h1:hQ5RXo3GNm2laHWKizDzU5DsDy+yNcenSca2UxN0850=
github.com/awslabs/goformation/v4 v4.1.0 h1:JRxIW0IjhYpYDrIZOTJGMu2azXKI+OK5dP56ubpywGU=
github.com/awslabs/goformation/v4 v4.1.0/go.mod h1:MBDN7u1lMNDoehbFuO4uPvgwPeolTMA2TzX1yO6KlxI=
Expand Down
14 changes: 7 additions & 7 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
}
}

func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroupARN string, startTime int64, endTime int64, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroupARN, startTime, endTime, logProcessor)
if err != nil {
var errRequestCanceled *awssdk.RequestCanceledError
if errors.As(err, &errRequestCanceled) {
Expand All @@ -68,9 +68,9 @@ func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, star
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroupARN string, startTime int64, endTime int64, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroupARN)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
for paginator.HasMorePages() {
filterLogEventsOutput, err := paginator.NextPage(context.TODO())
Expand All @@ -88,14 +88,14 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
logProcessor.processLogEvents(logEvents, logGroup, p.region)
logProcessor.processLogEvents(logEvents, logGroupARN, p.region)
}
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroupARN string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
LogGroupIdentifier: awssdk.String(logGroupARN),
StartTime: awssdk.Int64(startTime),
EndTime: awssdk.Int64(endTime),
}
Expand Down
36 changes: 18 additions & 18 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
}
})

logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName)
logGroupARNs, err := getLogGroupARNs(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName)
if err != nil {
return fmt.Errorf("failed to get log group names: %w", err)
return fmt.Errorf("failed to get log group ARNs: %w", err)
}

log := inputContext.Logger
Expand All @@ -143,11 +143,11 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
in.config.LogStreams,
in.config.LogStreamPrefix)
logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx)
cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames)))
return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames)
cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupARNs)))
return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupARNs)
}

func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error {
func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupARNs []string) error {
// This loop tries to keep the workers busy as much as possible while
// honoring the number in config opposed to a simpler loop that does one
// listing, sequentially processes every object and then does another listing
Expand Down Expand Up @@ -175,17 +175,17 @@ func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwa
}

workerWg.Add(availableWorkers)
logGroupNamesLength := len(logGroupNames)
logGroupARNsLength := len(logGroupARNs)
runningGoroutines := 0

for i := lastLogGroupOffset; i < logGroupNamesLength; i++ {
for i := lastLogGroupOffset; i < logGroupARNsLength; i++ {
if runningGoroutines >= availableWorkers {
break
}

runningGoroutines++
lastLogGroupOffset = i + 1
if lastLogGroupOffset >= logGroupNamesLength {
if lastLogGroupOffset >= logGroupARNsLength {
// release unused workers
cwPoller.workerSem.Release(availableWorkers - runningGoroutines)
for j := 0; j < availableWorkers-runningGoroutines; j++ {
Expand All @@ -194,15 +194,15 @@ func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwa
lastLogGroupOffset = 0
}

lg := logGroupNames[i]
go func(logGroup string, startTime int64, endTime int64) {
lg := logGroupARNs[i]
go func(logGroupARN string, startTime int64, endTime int64) {
defer func() {
cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup)
cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroupARN)
workerWg.Done()
cwPoller.workerSem.Release(1)
}()
cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup)
cwPoller.run(svc, logGroup, startTime, endTime, logProcessor)
cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroupARN)
cwPoller.run(svc, logGroupARN, startTime, endTime, logProcessor)
}(lg, cwPoller.startTime, cwPoller.endTime)
}
}
Expand Down Expand Up @@ -231,8 +231,8 @@ func parseARN(logGroupARN string) (string, string, error) {
return "", "", fmt.Errorf("cannot get log group name from log group ARN: %s", logGroupARN)
}

// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names
func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, logGroupName string) ([]string, error) {
// getLogGroupARNs uses DescribeLogGroups API to retrieve all log group names
func getLogGroupARNs(svc *cloudwatchlogs.Client, logGroupNamePrefix string, logGroupName string) ([]string, error) {
if logGroupNamePrefix == "" {
return []string{logGroupName}, nil
}
Expand All @@ -243,7 +243,7 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log
}

// make API request
var logGroupNames []string
var logGroupARNs []string
paginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(svc, describeLogGroupsInput)
for paginator.HasMorePages() {
page, err := paginator.NextPage(context.TODO())
Expand All @@ -252,10 +252,10 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log
}

for _, lg := range page.LogGroups {
logGroupNames = append(logGroupNames, *lg.LogGroupName)
logGroupARNs = append(logGroupARNs, *lg.Arn)
}
}
return logGroupNames, nil
return logGroupARNs, nil
}

func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) {
Expand Down
10 changes: 7 additions & 3 deletions x-pack/filebeat/input/awscloudwatch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli
}
}

func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) {
func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroupARN string, regionName string) {
for _, logEvent := range logEvents {
event := createEvent(logEvent, logGroup, regionName)
event := createEvent(logEvent, logGroupARN, regionName)
p.publish(p.ack, &event)
}
}
Expand All @@ -49,7 +49,11 @@ func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event
p.publisher.Publish(*event)
}

func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event {
func createEvent(logEvent types.FilteredLogEvent, logGroupARN string, regionName string) beat.Event {
logGroup, _, err := parseARN(logGroupARN)
if err != nil {
// ????
}
event := beat.Event{
Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(),
Fields: mapstr.M{
Expand Down

0 comments on commit 844df38

Please sign in to comment.