Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] [AWS] Add support to source AWS cloudwatch logs from linked accounts #41188

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Improved GCS input documentation. {pull}41143[41143]
- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]
- Add CSV decoding capacity to gcs input {pull}40979[40979]
- Add support to source AWS cloudwatch logs from linked accounts. {pull}41188[41188]
- Jounrald input now supports filtering by facilities {pull}41061[41061]
- System module now supports reading from jounrald. {pull}41061[41061]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@
#credential_profile_name: test-aws-s3-input

# ARN of the log group to collect logs from
# This ARN could refer to a log group from a linked source account
# Note: This property precedes over `log_group_name` & `log_group_name_prefix`
#log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*"

# Name of the log group to collect logs from.
Expand Down
18 changes: 14 additions & 4 deletions x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,26 @@ The `aws-cloudwatch` input supports the following configuration options plus the
[float]
==== `log_group_arn`
ARN of the log group to collect logs from.
The ARN may refer to a log group in a linked source account.
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved

Note: `log_group_arn` cannot be combined with `log_group_name`, `log_group_name_prefix` and `region_name` properties.
If set, values extracted from `log_group_arn` takes precedence over them.

Note: If the log group is in a linked source account and filebeat is configured to use a monitoring account, you must use the `log_group_arn`.
You can read more about AWS account linking and cross account observability from the https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Unified-Cross-Account.html[official documentation].

[float]
==== `log_group_name`
Name of the log group to collect logs from. Note: `region_name` is required when
log_group_name is given.
Name of the log group to collect logs from.

Note: `region_name` is required when log_group_name is given.

[float]
==== `log_group_name_prefix`
The prefix for a group of log group names. Note: `region_name` is required when
log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix`
The prefix for a group of log group names.

Note: `region_name` is required when
`log_group_name_prefix` is given. `log_group_name` and `log_group_name_prefix`
cannot be given at the same time. The number of workers that will process the
log groups under this prefix is set through the `number_of_workers` config.

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3069,6 +3069,8 @@ filebeat.inputs:
#credential_profile_name: test-aws-s3-input

# ARN of the log group to collect logs from
# This ARN could refer to a log group from a linked source account
# Note: This property precedes over `log_group_name` & `log_group_name_prefix`
#log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*"

# Name of the log group to collect logs from.
Expand Down
36 changes: 18 additions & 18 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type cloudwatchPoller struct {
}

type workResponse struct {
logGroup string
logGroupId string
startTime, endTime time.Time
}

Expand All @@ -64,8 +64,8 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
}
}

func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroupId, startTime, endTime, logProcessor)
if err != nil {
var errRequestCanceled *awssdk.RequestCanceledError
if errors.As(err, &errRequestCanceled) {
Expand All @@ -76,9 +76,9 @@ func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, star
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error {
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroupId)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
for paginator.HasMorePages() {
filterLogEventsOutput, err := paginator.NextPage(context.TODO())
Expand All @@ -96,16 +96,16 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
logProcessor.processLogEvents(logEvents, logGroup, p.region)
logProcessor.processLogEvents(logEvents, logGroupId, p.region)
}
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroupId string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(unixMsFromTime(startTime)),
EndTime: awssdk.Int64(unixMsFromTime(endTime)),
LogGroupIdentifier: awssdk.String(logGroupId),
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
StartTime: awssdk.Int64(unixMsFromTime(startTime)),
EndTime: awssdk.Int64(unixMsFromTime(endTime)),
}

if len(p.config.LogStreams) > 0 {
Expand Down Expand Up @@ -138,9 +138,9 @@ func (p *cloudwatchPoller) startWorkers(
work = <-p.workResponseChan
}

p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup)
p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup)
p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroupId)
p.run(svc, work.logGroupId, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroupId)
}
}()
}
Expand All @@ -149,7 +149,7 @@ func (p *cloudwatchPoller) startWorkers(
// receive implements the main run loop that distributes tasks to the worker
// goroutines. It accepts a "clock" callback (which on a live input should
// equal time.Now) to allow deterministic unit tests.
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) {
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupIDs []string, clock func() time.Time) {
defer p.workerWg.Wait()
// startTime and endTime are the bounds of the current scanning interval.
// If we're starting at the end of the logs, advance the start time to the
Expand All @@ -160,15 +160,15 @@ func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string,
startTime = endTime.Add(-p.config.ScanFrequency)
}
for ctx.Err() == nil {
for _, lg := range logGroupNames {
for _, lg := range logGroupIDs {
select {
case <-ctx.Done():
return
case <-p.workRequestChan:
p.workResponseChan <- workResponse{
logGroup: lg,
startTime: startTime,
endTime: endTime,
logGroupId: lg,
startTime: startTime,
endTime: endTime,
}
}
}
Expand Down
100 changes: 51 additions & 49 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type receiveTestStep struct {

type receiveTestCase struct {
name string
logGroups []string
logGroupIDs []string
configOverrides func(*config)
startTime time.Time
steps []receiveTestStep
Expand All @@ -46,128 +46,128 @@ func TestReceive(t *testing.T) {
t3 := t2.Add(time.Hour)
testCases := []receiveTestCase{
{
name: "Default config with one log group",
logGroups: []string{"a"},
startTime: t1,
name: "Default config with one log group",
logGroupIDs: []string{"a"},
startTime: t1,
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t0, endTime: t1},
{logGroupId: "a", startTime: t0, endTime: t1},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1, endTime: t2},
{logGroupId: "a", startTime: t1, endTime: t2},
},
nextTime: t3,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t2, endTime: t3},
{logGroupId: "a", startTime: t2, endTime: t3},
},
},
},
},
{
name: "Default config with two log groups",
logGroups: []string{"a", "b"},
startTime: t1,
name: "Default config with two log groups",
logGroupIDs: []string{"a", "b"},
startTime: t1,
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t0, endTime: t1},
{logGroupId: "a", startTime: t0, endTime: t1},
},
nextTime: t2,
},
{
expected: []workResponse{
// start/end times for the second log group should be the same
// even though the clock has changed.
{logGroup: "b", startTime: t0, endTime: t1},
{logGroupId: "b", startTime: t0, endTime: t1},
},
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1, endTime: t2},
{logGroup: "b", startTime: t1, endTime: t2},
{logGroupId: "a", startTime: t1, endTime: t2},
{logGroupId: "b", startTime: t1, endTime: t2},
},
nextTime: t3,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t2, endTime: t3},
{logGroup: "b", startTime: t2, endTime: t3},
{logGroupId: "a", startTime: t2, endTime: t3},
{logGroupId: "b", startTime: t2, endTime: t3},
},
},
},
},
{
name: "One log group with start_position: end",
logGroups: []string{"a"},
startTime: t1,
name: "One log group with start_position: end",
logGroupIDs: []string{"a"},
startTime: t1,
configOverrides: func(c *config) {
c.StartPosition = "end"
},
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1},
{logGroupId: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1, endTime: t2},
{logGroupId: "a", startTime: t1, endTime: t2},
},
},
},
},
{
name: "Two log group with start_position: end and latency",
logGroups: []string{"a", "b"},
startTime: t1,
name: "Two log group with start_position: end and latency",
logGroupIDs: []string{"a", "b"},
startTime: t1,
configOverrides: func(c *config) {
c.StartPosition = "end"
c.Latency = time.Second
},
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
{logGroup: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
{logGroupId: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
{logGroupId: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
},
},
},
},
{
name: "Three log groups with latency",
logGroups: []string{"a", "b", "c"},
startTime: t1,
name: "Three log groups with latency",
logGroupIDs: []string{"a", "b", "c"},
startTime: t1,
configOverrides: func(c *config) {
c.Latency = time.Second
},
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroup: "b", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroup: "c", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroupId: "a", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroupId: "b", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroupId: "c", startTime: t0, endTime: t1.Add(-time.Second)},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroup: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
},
},
},
Expand All @@ -191,7 +191,7 @@ func TestReceive(t *testing.T) {
test.configOverrides(&p.config)
}
clock.time = test.startTime
go p.receive(ctx, test.logGroups, clock.now)
go p.receive(ctx, test.logGroupIDs, clock.now)
for _, step := range test.steps {
for i, expected := range step.expected {
p.workRequestChan <- struct{}{}
Expand All @@ -209,34 +209,36 @@ func TestReceive(t *testing.T) {
}

type filterLogEventsTestCase struct {
name string
logGroup string
startTime time.Time
endTime time.Time
expected *cloudwatchlogs.FilterLogEventsInput
name string
logGroupId string
startTime time.Time
endTime time.Time
expected *cloudwatchlogs.FilterLogEventsInput
}

func TestFilterLogEventsInput(t *testing.T) {
now, _ := time.Parse(time.RFC3339, "2024-07-12T13:00:00+00:00")
id := "myLogGroup"

testCases := []filterLogEventsTestCase{
{
name: "StartPosition: beginning, first iteration",
logGroup: "a",
name: "StartPosition: beginning, first iteration",
logGroupId: id,
// The zero value of type time.Time{} is January 1, year 1, 00:00:00.000000000 UTC
// Events with a timestamp before the time - January 1, 1970, 00:00:00 UTC are not returned by AWS API
// make sure zero value of time.Time{} was converted
startTime: time.Time{},
endTime: now,
expected: &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String("a"),
StartTime: awssdk.Int64(0),
EndTime: awssdk.Int64(1720789200000),
LogGroupIdentifier: awssdk.String(id),
StartTime: awssdk.Int64(0),
EndTime: awssdk.Int64(1720789200000),
},
},
}
for _, test := range testCases {
p := cloudwatchPoller{}
result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroup)
result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroupId)
assert.Equal(t, test.expected, result)
}

Expand Down
Loading
Loading