Skip to content

Commit

Permalink
[Filebeat] [AWS] Add support to source AWS cloudwatch logs from linke…
Browse files Browse the repository at this point in the history
…d accounts (#41188)

* use LogGroupIdentifier fiter instead of LogGroupName and related parameter, field renaming

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* configuration parsing to support arn & linked accounts

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* document the ARN usage

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* add changelog entry

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* code review changes

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* code review change - fix typo

Signed-off-by: Kavindu Dodanduwa <[email protected]>

---------

Signed-off-by: Kavindu Dodanduwa <[email protected]>
Co-authored-by: kaiyan-sheng <[email protected]>
(cherry picked from commit 42f2d41)
  • Loading branch information
Kavindu-Dodan authored and mergify[bot] committed Oct 15, 2024
1 parent 9812222 commit ae4ae95
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]
- Add CSV decoding capacity to gcs input {pull}40979[40979]
- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]
- Add support to source AWS cloudwatch logs from linked accounts. {pull}41188[41188]
- Jounrald input now supports filtering by facilities {pull}41061[41061]
- System module now supports reading from jounrald. {pull}41061[41061]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@
#credential_profile_name: test-aws-s3-input

# ARN of the log group to collect logs from
# This ARN could refer to a log group from a linked source account
# Note: This property precedes over `log_group_name` & `log_group_name_prefix`
#log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*"

# Name of the log group to collect logs from.
Expand Down
18 changes: 14 additions & 4 deletions x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,26 @@ The `aws-cloudwatch` input supports the following configuration options plus the
[float]
==== `log_group_arn`
ARN of the log group to collect logs from.
The ARN may refer to a log group in a linked source account.

Note: `log_group_arn` cannot be combined with `log_group_name`, `log_group_name_prefix` and `region_name` properties.
If set, values extracted from `log_group_arn` takes precedence over them.

Note: If the log group is in a linked source account and filebeat is configured to use a monitoring account, you must use the `log_group_arn`.
You can read more about AWS account linking and cross account observability from the https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Unified-Cross-Account.html[official documentation].

[float]
==== `log_group_name`
Name of the log group to collect logs from. Note: `region_name` is required when
log_group_name is given.
Name of the log group to collect logs from.

Note: `region_name` is required when log_group_name is given.

[float]
==== `log_group_name_prefix`
The prefix for a group of log group names. Note: `region_name` is required when
log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix`
The prefix for a group of log group names.

Note: `region_name` is required when
`log_group_name_prefix` is given. `log_group_name` and `log_group_name_prefix`
cannot be given at the same time. The number of workers that will process the
log groups under this prefix is set through the `number_of_workers` config.

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3069,6 +3069,8 @@ filebeat.inputs:
#credential_profile_name: test-aws-s3-input

# ARN of the log group to collect logs from
# This ARN could refer to a log group from a linked source account
# Note: This property precedes over `log_group_name` & `log_group_name_prefix`
#log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*"

# Name of the log group to collect logs from.
Expand Down
36 changes: 18 additions & 18 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type cloudwatchPoller struct {
}

type workResponse struct {
logGroup string
logGroupId string
startTime, endTime time.Time
}

Expand All @@ -64,8 +64,8 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
}
}

func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroupId, startTime, endTime, logProcessor)
if err != nil {
var errRequestCanceled *awssdk.RequestCanceledError
if errors.As(err, &errRequestCanceled) {
Expand All @@ -76,9 +76,9 @@ func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, star
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error {
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroupId)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
for paginator.HasMorePages() {
filterLogEventsOutput, err := paginator.NextPage(context.TODO())
Expand All @@ -96,16 +96,16 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
logProcessor.processLogEvents(logEvents, logGroup, p.region)
logProcessor.processLogEvents(logEvents, logGroupId, p.region)
}
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroupId string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(unixMsFromTime(startTime)),
EndTime: awssdk.Int64(unixMsFromTime(endTime)),
LogGroupIdentifier: awssdk.String(logGroupId),
StartTime: awssdk.Int64(unixMsFromTime(startTime)),
EndTime: awssdk.Int64(unixMsFromTime(endTime)),
}

if len(p.config.LogStreams) > 0 {
Expand Down Expand Up @@ -138,9 +138,9 @@ func (p *cloudwatchPoller) startWorkers(
work = <-p.workResponseChan
}

p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup)
p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup)
p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroupId)
p.run(svc, work.logGroupId, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroupId)
}
}()
}
Expand All @@ -149,7 +149,7 @@ func (p *cloudwatchPoller) startWorkers(
// receive implements the main run loop that distributes tasks to the worker
// goroutines. It accepts a "clock" callback (which on a live input should
// equal time.Now) to allow deterministic unit tests.
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) {
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupIDs []string, clock func() time.Time) {
defer p.workerWg.Wait()
// startTime and endTime are the bounds of the current scanning interval.
// If we're starting at the end of the logs, advance the start time to the
Expand All @@ -160,15 +160,15 @@ func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string,
startTime = endTime.Add(-p.config.ScanFrequency)
}
for ctx.Err() == nil {
for _, lg := range logGroupNames {
for _, lg := range logGroupIDs {
select {
case <-ctx.Done():
return
case <-p.workRequestChan:
p.workResponseChan <- workResponse{
logGroup: lg,
startTime: startTime,
endTime: endTime,
logGroupId: lg,
startTime: startTime,
endTime: endTime,
}
}
}
Expand Down
100 changes: 51 additions & 49 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type receiveTestStep struct {

type receiveTestCase struct {
name string
logGroups []string
logGroupIDs []string
configOverrides func(*config)
startTime time.Time
steps []receiveTestStep
Expand All @@ -46,128 +46,128 @@ func TestReceive(t *testing.T) {
t3 := t2.Add(time.Hour)
testCases := []receiveTestCase{
{
name: "Default config with one log group",
logGroups: []string{"a"},
startTime: t1,
name: "Default config with one log group",
logGroupIDs: []string{"a"},
startTime: t1,
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t0, endTime: t1},
{logGroupId: "a", startTime: t0, endTime: t1},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1, endTime: t2},
{logGroupId: "a", startTime: t1, endTime: t2},
},
nextTime: t3,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t2, endTime: t3},
{logGroupId: "a", startTime: t2, endTime: t3},
},
},
},
},
{
name: "Default config with two log groups",
logGroups: []string{"a", "b"},
startTime: t1,
name: "Default config with two log groups",
logGroupIDs: []string{"a", "b"},
startTime: t1,
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t0, endTime: t1},
{logGroupId: "a", startTime: t0, endTime: t1},
},
nextTime: t2,
},
{
expected: []workResponse{
// start/end times for the second log group should be the same
// even though the clock has changed.
{logGroup: "b", startTime: t0, endTime: t1},
{logGroupId: "b", startTime: t0, endTime: t1},
},
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1, endTime: t2},
{logGroup: "b", startTime: t1, endTime: t2},
{logGroupId: "a", startTime: t1, endTime: t2},
{logGroupId: "b", startTime: t1, endTime: t2},
},
nextTime: t3,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t2, endTime: t3},
{logGroup: "b", startTime: t2, endTime: t3},
{logGroupId: "a", startTime: t2, endTime: t3},
{logGroupId: "b", startTime: t2, endTime: t3},
},
},
},
},
{
name: "One log group with start_position: end",
logGroups: []string{"a"},
startTime: t1,
name: "One log group with start_position: end",
logGroupIDs: []string{"a"},
startTime: t1,
configOverrides: func(c *config) {
c.StartPosition = "end"
},
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1},
{logGroupId: "a", startTime: t1.Add(-defaultScanFrequency), endTime: t1},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1, endTime: t2},
{logGroupId: "a", startTime: t1, endTime: t2},
},
},
},
},
{
name: "Two log group with start_position: end and latency",
logGroups: []string{"a", "b"},
startTime: t1,
name: "Two log group with start_position: end and latency",
logGroupIDs: []string{"a", "b"},
startTime: t1,
configOverrides: func(c *config) {
c.StartPosition = "end"
c.Latency = time.Second
},
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
{logGroup: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
{logGroupId: "a", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
{logGroupId: "b", startTime: t1.Add(-defaultScanFrequency - time.Second), endTime: t1.Add(-time.Second)},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
},
},
},
},
{
name: "Three log groups with latency",
logGroups: []string{"a", "b", "c"},
startTime: t1,
name: "Three log groups with latency",
logGroupIDs: []string{"a", "b", "c"},
startTime: t1,
configOverrides: func(c *config) {
c.Latency = time.Second
},
steps: []receiveTestStep{
{
expected: []workResponse{
{logGroup: "a", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroup: "b", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroup: "c", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroupId: "a", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroupId: "b", startTime: t0, endTime: t1.Add(-time.Second)},
{logGroupId: "c", startTime: t0, endTime: t1.Add(-time.Second)},
},
nextTime: t2,
},
{
expected: []workResponse{
{logGroup: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroup: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroup: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "a", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "b", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
{logGroupId: "c", startTime: t1.Add(-time.Second), endTime: t2.Add(-time.Second)},
},
},
},
Expand All @@ -191,7 +191,7 @@ func TestReceive(t *testing.T) {
test.configOverrides(&p.config)
}
clock.time = test.startTime
go p.receive(ctx, test.logGroups, clock.now)
go p.receive(ctx, test.logGroupIDs, clock.now)
for _, step := range test.steps {
for i, expected := range step.expected {
p.workRequestChan <- struct{}{}
Expand All @@ -209,34 +209,36 @@ func TestReceive(t *testing.T) {
}

type filterLogEventsTestCase struct {
name string
logGroup string
startTime time.Time
endTime time.Time
expected *cloudwatchlogs.FilterLogEventsInput
name string
logGroupId string
startTime time.Time
endTime time.Time
expected *cloudwatchlogs.FilterLogEventsInput
}

func TestFilterLogEventsInput(t *testing.T) {
now, _ := time.Parse(time.RFC3339, "2024-07-12T13:00:00+00:00")
id := "myLogGroup"

testCases := []filterLogEventsTestCase{
{
name: "StartPosition: beginning, first iteration",
logGroup: "a",
name: "StartPosition: beginning, first iteration",
logGroupId: id,
// The zero value of type time.Time{} is January 1, year 1, 00:00:00.000000000 UTC
// Events with a timestamp before the time - January 1, 1970, 00:00:00 UTC are not returned by AWS API
// make sure zero value of time.Time{} was converted
startTime: time.Time{},
endTime: now,
expected: &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String("a"),
StartTime: awssdk.Int64(0),
EndTime: awssdk.Int64(1720789200000),
LogGroupIdentifier: awssdk.String(id),
StartTime: awssdk.Int64(0),
EndTime: awssdk.Int64(1720789200000),
},
},
}
for _, test := range testCases {
p := cloudwatchPoller{}
result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroup)
result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroupId)
assert.Equal(t, test.expected, result)
}

Expand Down
Loading

0 comments on commit ae4ae95

Please sign in to comment.