From 844df381d581d36d56dbae51c15201dd03081399 Mon Sep 17 00:00:00 2001 From: tommyers-elastic <106530686+tommyers-elastic@users.noreply.github.com> Date: Thu, 21 Sep 2023 15:15:57 +0100 Subject: [PATCH] Enable awscloudwatch input to collect logs from linked source accounts. For cross account monitoring, we need to use the log group ARN, instead of log group name in order to retreive logs from outside the monitoring account. This change used ARN throughout (but still requires a conversion back to get the 'plain' log group name to construct the log metadata at the end - not ideal). --- go.mod | 10 +++--- go.sum | 10 ++++++ .../input/awscloudwatch/cloudwatch.go | 14 ++++---- x-pack/filebeat/input/awscloudwatch/input.go | 36 +++++++++---------- .../filebeat/input/awscloudwatch/processor.go | 10 ++++-- 5 files changed, 47 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index a34d9f460d58..fbc45656f9d8 100644 --- a/go.mod +++ b/go.mod @@ -27,11 +27,11 @@ require ( github.com/apoydence/eachers v0.0.0-20181020210610-23942921fe77 // indirect github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-lambda-go v1.13.3 - github.com/aws/aws-sdk-go-v2 v1.18.0 + github.com/aws/aws-sdk-go-v2 v1.21.0 github.com/aws/aws-sdk-go-v2/config v1.17.7 github.com/aws/aws-sdk-go-v2/credentials v1.12.20 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0 - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5 + github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0 github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4 github.com/aws/aws-sdk-go-v2/service/ec2 v1.36.1 github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.18.4 @@ -198,7 +198,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.33 github.com/aws/aws-sdk-go-v2/service/cloudformation v1.20.4 github.com/aws/aws-sdk-go-v2/service/kinesis v1.15.8 - github.com/aws/smithy-go v1.13.5 + github.com/aws/smithy-go v1.14.2 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.6.2 @@ -249,8 +249,8 @@ require ( github.com/armon/go-radix v1.0.0 // indirect github.com/aws/aws-sdk-go v1.38.60 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.9 // indirect diff --git a/go.sum b/go.sum index 9249a6758da4..54b15f019599 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ github.com/aws/aws-sdk-go-v2 v1.16.6/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4 github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc= +github.com/aws/aws-sdk-go-v2 v1.21.0/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3/go.mod h1:gNsR5CaXKmQSSzrmGxmwmct/r+ZBfbxorAuXYsj/M5Y= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 h1:tcFliCWne+zOuUfKNRn8JdFBuWPDuISDH08wD2ULkhk= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8/go.mod h1:JTnlBSot91steJeti4ryyu/tLd4Sk84O5W22L7O2EQU= @@ -303,11 +305,15 @@ github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.13/go.mod h1:wLLesU+LdM github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.23/go.mod h1:2DFxAQ9pfIRy0imBCJv+vZ2X6RKxves6fbnEuSry6b4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41 h1:22dGT7PneFMx4+b3pz7lMTRyN8ZKH7M2cW4GP9yUS2g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.41/go.mod h1:CrObHAuPneJBlfEJ5T3szXOUkLEThaGfvnhTf33buas= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4/go.mod h1:8glyUqVIM4AmeenIsPo0oVh3+NUwnsQml2OFupfQW+0= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.7/go.mod h1:93Uot80ddyVzSl//xEJreNKMhxntr71WtR3v/A1cRYk= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.17/go.mod h1:pRwaTYCJemADaqCbUAxltMoHKata7hmB5PjEXeu0kfg= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35 h1:SijA0mgjV8E+8G45ltVHs0fvKpTj8xmZJ3VwhGKtUSI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.35/go.mod h1:SJC1nEVVva1g3pHAIdCp7QsRIkMmLAgoDquQ9Rr8kYw= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24 h1:wj5Rwc05hvUSvKuOF29IYb9QrCLjU+rHAy/x/o0DK2c= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.24/go.mod h1:jULHjqqjDlbyTa7pfM7WICATnOv+iOhjletM3N0Xbu8= github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.14 h1:ZSIPAkAsCCjYrhqfw2+lNzWDzxzHXEckFkTePL5RSWQ= @@ -318,6 +324,8 @@ github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0 h1:sSzrsKQULJmPtmu6By4wR github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.26.0/go.mod h1:t5mizLPjCYafXoHCXOHJU7z4OvLbY70Echvb1ciBTV4= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5 h1:aPK8IBVKeozo/pNGshT8xOJ2V3Y7ykOM49QcY0vhUSM= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.15.5/go.mod h1:ErjxucZaraVbYm66xxub00qmGBw7md2RFqy6624KbR8= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0 h1:6LRil7J+uh2SZ58Wkm/5aVRpBOZbTtwi8p8gdsix94c= +github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.24.0/go.mod h1:5v2ZNXCSwG73rx0k3sCuB1Ju8sbEbG0iUlxCA7D8sV8= github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4 h1:jbfG3cbq1kiK1/OAfUh4zf1ADtAU8KoeOPfF94S96pU= github.com/aws/aws-sdk-go-v2/service/costexplorer v1.18.4/go.mod h1:yC5cDNa3xzSh5NIU5x0NBBo6QkcsaM0tuPNCczeUPoU= github.com/aws/aws-sdk-go-v2/service/ec2 v1.36.1 h1:FS8Ja6LuLDVHcX+rmoNpOXqYb52N2A5DwQy7Dgduq4Q= @@ -360,6 +368,8 @@ github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J github.com/aws/smithy-go v1.13.3/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/smithy-go v1.14.2 h1:MJU9hqBGbvWZdApzpvoF2WAIJDbtjK2NDJSiJP7HblQ= +github.com/aws/smithy-go v1.14.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/awslabs/goformation/v3 v3.1.0/go.mod h1:hQ5RXo3GNm2laHWKizDzU5DsDy+yNcenSca2UxN0850= github.com/awslabs/goformation/v4 v4.1.0 h1:JRxIW0IjhYpYDrIZOTJGMu2azXKI+OK5dP56ubpywGU= github.com/awslabs/goformation/v4 v4.1.0/go.mod h1:MBDN7u1lMNDoehbFuO4uPvgwPeolTMA2TzX1yO6KlxI= diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index ca54721bd279..6737fbb853dd 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -56,8 +56,8 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, } } -func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) { - err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor) +func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroupARN string, startTime int64, endTime int64, logProcessor *logProcessor) { + err := p.getLogEventsFromCloudWatch(svc, logGroupARN, startTime, endTime, logProcessor) if err != nil { var errRequestCanceled *awssdk.RequestCanceledError if errors.As(err, &errRequestCanceled) { @@ -68,9 +68,9 @@ func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, star } // getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch -func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error { +func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroupARN string, startTime int64, endTime int64, logProcessor *logProcessor) error { // construct FilterLogEventsInput - filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup) + filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroupARN) paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput) for paginator.HasMorePages() { filterLogEventsOutput, err := paginator.NextPage(context.TODO()) @@ -88,14 +88,14 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client p.log.Debug("done sleeping") p.log.Debugf("Processing #%v events", len(logEvents)) - logProcessor.processLogEvents(logEvents, logGroup, p.region) + logProcessor.processLogEvents(logEvents, logGroupARN, p.region) } return nil } -func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput { +func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroupARN string) *cloudwatchlogs.FilterLogEventsInput { filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: awssdk.String(logGroup), + LogGroupIdentifier: awssdk.String(logGroupARN), StartTime: awssdk.Int64(startTime), EndTime: awssdk.Int64(endTime), } diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index f9d69fe1184f..84b05d3a8ac6 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -126,9 +126,9 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) } }) - logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName) + logGroupARNs, err := getLogGroupARNs(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName) if err != nil { - return fmt.Errorf("failed to get log group names: %w", err) + return fmt.Errorf("failed to get log group ARNs: %w", err) } log := inputContext.Logger @@ -143,11 +143,11 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) in.config.LogStreams, in.config.LogStreamPrefix) logProcessor := newLogProcessor(log.Named("log_processor"), in.metrics, client, ctx) - cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupNames))) - return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupNames) + cwPoller.metrics.logGroupsTotal.Add(uint64(len(logGroupARNs))) + return in.Receive(svc, cwPoller, ctx, logProcessor, logGroupARNs) } -func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupNames []string) error { +func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwatchPoller, ctx context.Context, logProcessor *logProcessor, logGroupARNs []string) error { // This loop tries to keep the workers busy as much as possible while // honoring the number in config opposed to a simpler loop that does one // listing, sequentially processes every object and then does another listing @@ -175,17 +175,17 @@ func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwa } workerWg.Add(availableWorkers) - logGroupNamesLength := len(logGroupNames) + logGroupARNsLength := len(logGroupARNs) runningGoroutines := 0 - for i := lastLogGroupOffset; i < logGroupNamesLength; i++ { + for i := lastLogGroupOffset; i < logGroupARNsLength; i++ { if runningGoroutines >= availableWorkers { break } runningGoroutines++ lastLogGroupOffset = i + 1 - if lastLogGroupOffset >= logGroupNamesLength { + if lastLogGroupOffset >= logGroupARNsLength { // release unused workers cwPoller.workerSem.Release(availableWorkers - runningGoroutines) for j := 0; j < availableWorkers-runningGoroutines; j++ { @@ -194,15 +194,15 @@ func (in *cloudwatchInput) Receive(svc *cloudwatchlogs.Client, cwPoller *cloudwa lastLogGroupOffset = 0 } - lg := logGroupNames[i] - go func(logGroup string, startTime int64, endTime int64) { + lg := logGroupARNs[i] + go func(logGroupARN string, startTime int64, endTime int64) { defer func() { - cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroup) + cwPoller.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", logGroupARN) workerWg.Done() cwPoller.workerSem.Release(1) }() - cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroup) - cwPoller.run(svc, logGroup, startTime, endTime, logProcessor) + cwPoller.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", logGroupARN) + cwPoller.run(svc, logGroupARN, startTime, endTime, logProcessor) }(lg, cwPoller.startTime, cwPoller.endTime) } } @@ -231,8 +231,8 @@ func parseARN(logGroupARN string) (string, string, error) { return "", "", fmt.Errorf("cannot get log group name from log group ARN: %s", logGroupARN) } -// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names -func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, logGroupName string) ([]string, error) { +// getLogGroupARNs uses DescribeLogGroups API to retrieve all log group names +func getLogGroupARNs(svc *cloudwatchlogs.Client, logGroupNamePrefix string, logGroupName string) ([]string, error) { if logGroupNamePrefix == "" { return []string{logGroupName}, nil } @@ -243,7 +243,7 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log } // make API request - var logGroupNames []string + var logGroupARNs []string paginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(svc, describeLogGroupsInput) for paginator.HasMorePages() { page, err := paginator.NextPage(context.TODO()) @@ -252,10 +252,10 @@ func getLogGroupNames(svc *cloudwatchlogs.Client, logGroupNamePrefix string, log } for _, lg := range page.LogGroups { - logGroupNames = append(logGroupNames, *lg.LogGroupName) + logGroupARNs = append(logGroupARNs, *lg.Arn) } } - return logGroupNames, nil + return logGroupARNs, nil } func getStartPosition(startPosition string, currentTime time.Time, endTime int64, scanFrequency time.Duration, latency time.Duration) (int64, int64) { diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go index 999cad4d7f0f..883f59edcd42 100644 --- a/x-pack/filebeat/input/awscloudwatch/processor.go +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -35,9 +35,9 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli } } -func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) { +func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroupARN string, regionName string) { for _, logEvent := range logEvents { - event := createEvent(logEvent, logGroup, regionName) + event := createEvent(logEvent, logGroupARN, regionName) p.publish(p.ack, &event) } } @@ -49,7 +49,11 @@ func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event p.publisher.Publish(*event) } -func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event { +func createEvent(logEvent types.FilteredLogEvent, logGroupARN string, regionName string) beat.Event { + logGroup, _, err := parseARN(logGroupARN) + if err != nil { + // ???? + } event := beat.Event{ Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(), Fields: mapstr.M{