Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter: aws metadata group retries #7245

Merged

Conversation

mwarzynski
Copy link
Contributor

@mwarzynski mwarzynski commented Apr 22, 2023

PR adds handling errors received while fetching metadata groups in the Filter AWS functionality. It also allows to define the retries interval if such scenario occurs.

retry_interval_s   Defines minimum duration between retries for fetching metadata groups
                   > default: 300, type: integer

Our current behaviour is to not inject any AWS metadata if fetching just one piece of information fails. It might be not the best approach as most likely we would like to inject as much as we can. In other words, it would be better to have some additional AWS labels, if we can fetch them, rather than nothing.

For example, on current master, if Instance Metadata server misbehaves and our implementation fails to fetch EC2 tags, then output logs won't contain any AWS Metadata labels. Even the basic ones like EC2 Instance ID. Logs pushed in such a way might be effectively useless as system won't be able to attribute them to particular instance.

Proposed changes in this PR have an effect in scope of fetching all metadata groups (and a failure to do so). For example, if fetching EC2 Tags fails, then other AWS metadata information should still be injected into the logs, at least under the assumption that Filter AWS is able to fetch them without any errors. Retries for fetching the metadata groups will be done at the interval not smaller than the specified retry_interval_s.

PR is related to the following comment: #7129 (comment).


IIRC from my testing that you already handle the tag server 404 not enabled gracefully? But not any other case.

Yes, we handle 404 errors gracefully. 404 has no effect on other AWS metadata. In case of 404, we just mark tags as fetched and proceed (i.e. it is the same case when instance has 0 tags).


Example configuration

[SERVICE]
    flush        1
    daemon       Off
    log_level    debug


[INPUT]
    name stdin

[FILTER]
    Name aws
    imds_version v1
    Match *
    ec2_instance_id true
    tags_enabled true
    retry_interval_s 5

[OUTPUT]
    name  stdout
    match *

Debug log output

Fluent Bit v2.1.5
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2023/06/20 23:11:23] [ info] Configuration:
[2023/06/20 23:11:23] [ info]  flush time     | 1.000000 seconds
[2023/06/20 23:11:23] [ info]  grace          | 5 seconds
[2023/06/20 23:11:23] [ info]  daemon         | 0
[2023/06/20 23:11:23] [ info] ___________
[2023/06/20 23:11:23] [ info]  inputs:
[2023/06/20 23:11:23] [ info]      stdin
[2023/06/20 23:11:23] [ info] ___________
[2023/06/20 23:11:23] [ info]  filters:
[2023/06/20 23:11:23] [ info]      aws.0
[2023/06/20 23:11:23] [ info] ___________
[2023/06/20 23:11:23] [ info]  outputs:
[2023/06/20 23:11:23] [ info]      stdout.0
[2023/06/20 23:11:23] [ info] ___________
[2023/06/20 23:11:23] [ info]  collectors:
[2023/06/20 23:11:23] [ info] [fluent bit] version=2.1.5, commit=1f9e707217, pid=2641
[2023/06/20 23:11:23] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/06/20 23:11:23] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/06/20 23:11:23] [ info] [cmetrics] version=0.6.2
[2023/06/20 23:11:23] [ info] [ctraces ] version=0.3.1
[2023/06/20 23:11:23] [ info] [input:stdin:stdin.0] initializing
[2023/06/20 23:11:23] [ info] [input:stdin:stdin.0] storage_strategy='memory' (memory only)
[2023/06/20 23:11:23] [debug] [stdin:stdin.0] created event channels: read=21 write=22
[2023/06/20 23:11:23] [debug] [input:stdin:stdin.0] buf_size=16000
[2023/06/20 23:11:23] [debug] [imds] using IMDSv1
[2023/06/20 23:11:23] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:23] [debug] [imds] using IMDSv1
[2023/06/20 23:11:23] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:23] [debug] [imds] using IMDSv1
[2023/06/20 23:11:23] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:23] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:11:23] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:11:23] [debug] [stdout:stdout.0] created event channels: read=24 write=25
[2023/06/20 23:11:23] [ info] [sp] stream processor started
[2023/06/20 23:11:23] [ info] [output:stdout:stdout.0] worker #0 started
[2023/06/20 23:11:23] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:24] [debug] [task] created task=0x7f6a68024ad0 id=0 OK
[2023/06/20 23:11:24] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302683.893142993, {}], {"log"=>"emfavbsxim", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:24] [debug] [out flush] cb_destroy coro_id=0
[2023/06/20 23:11:24] [debug] [task] destroy task=0x7f6a68024ad0 (task_id=0)
[2023/06/20 23:11:24] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:24] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:25] [debug] [task] created task=0x7f6a68028e60 id=0 OK
[2023/06/20 23:11:25] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302684.374381047, {}], {"log"=>"wowducooxe", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302684.875083562, {}], {"log"=>"yudtfbsgvl", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:25] [debug] [out flush] cb_destroy coro_id=1
[2023/06/20 23:11:25] [debug] [task] destroy task=0x7f6a68028e60 (task_id=0)
[2023/06/20 23:11:25] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:25] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:26] [debug] [task] created task=0x7f6a68029180 id=0 OK
[2023/06/20 23:11:26] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302685.375792267, {}], {"log"=>"lsniojweym", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302685.876517552, {}], {"log"=>"wfwobquqbo", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:26] [debug] [out flush] cb_destroy coro_id=2
[2023/06/20 23:11:26] [debug] [task] destroy task=0x7f6a68029180 (task_id=0)
[2023/06/20 23:11:26] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:26] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:27] [debug] [task] created task=0x7f6a68014620 id=0 OK
[2023/06/20 23:11:27] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302686.377204802, {}], {"log"=>"bqyshdcqyf", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302686.877946820, {}], {"log"=>"jfdkamekqc", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:27] [debug] [out flush] cb_destroy coro_id=3
[2023/06/20 23:11:27] [debug] [task] destroy task=0x7f6a68014620 (task_id=0)
[2023/06/20 23:11:27] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:27] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:28] [debug] [task] created task=0x7f6a68014750 id=0 OK
[2023/06/20 23:11:28] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302687.378669569, {}], {"log"=>"wnsxlhteid", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302687.879385922, {}], {"log"=>"jwpgcxkjtt", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:28] [debug] [out flush] cb_destroy coro_id=4
[2023/06/20 23:11:28] [debug] [task] destroy task=0x7f6a68014750 (task_id=0)
[2023/06/20 23:11:28] [debug] [imds] using IMDSv1
[2023/06/20 23:11:28] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:28] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:11:28] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:11:28] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:28] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:29] [debug] [task] created task=0x7f6a68015500 id=0 OK
[2023/06/20 23:11:29] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302688.380092365, {}], {"log"=>"gqshkfxqkf", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302688.880844758, {}], {"log"=>"ezlfgynfls", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:29] [debug] [out flush] cb_destroy coro_id=5
[2023/06/20 23:11:29] [debug] [task] destroy task=0x7f6a68015500 (task_id=0)
[2023/06/20 23:11:29] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:29] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:30] [debug] [task] created task=0x7f6a68015600 id=0 OK
[2023/06/20 23:11:30] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302689.381531701, {}], {"log"=>"yuvjowigrg", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302689.882267085, {}], {"log"=>"fsicnaqjmj", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:30] [debug] [out flush] cb_destroy coro_id=6
[2023/06/20 23:11:30] [debug] [task] destroy task=0x7f6a68015600 (task_id=0)
[2023/06/20 23:11:30] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:30] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:31] [debug] [task] created task=0x7f6a68015770 id=0 OK
[2023/06/20 23:11:31] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302690.382976576, {}], {"log"=>"uuvrsyqjez", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302690.883703719, {}], {"log"=>"skpdkpcwoz", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:31] [debug] [out flush] cb_destroy coro_id=7
[2023/06/20 23:11:31] [debug] [task] destroy task=0x7f6a68015770 (task_id=0)
[2023/06/20 23:11:31] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:31] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:32] [debug] [task] created task=0x7f6a68015770 id=0 OK
[2023/06/20 23:11:32] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302691.384421041, {}], {"log"=>"umrnngyscm", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302691.885152211, {}], {"log"=>"wjmlmvhzen", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:32] [debug] [out flush] cb_destroy coro_id=8
[2023/06/20 23:11:32] [debug] [task] destroy task=0x7f6a68015770 (task_id=0)
[2023/06/20 23:11:32] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:32] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[0] stdin.0: [[1687302692.385868793, {}], {"log"=>"ufmlfusukc", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:33] [debug] [task] created task=0x7f6a68015770 id=0 OK
[1] stdin.0: [[1687302692.886602452, {}], {"log"=>"gjipkoiuej", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:33] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:11:33] [debug] [out flush] cb_destroy coro_id=9
[2023/06/20 23:11:33] [debug] [task] destroy task=0x7f6a68015770 (task_id=0)
[2023/06/20 23:11:33] [debug] [imds] using IMDSv1
[2023/06/20 23:11:33] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:33] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:11:33] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:11:33] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:33] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:34] [debug] [task] created task=0x7f6a680158c0 id=0 OK
[2023/06/20 23:11:34] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302693.387297097, {}], {"log"=>"mivzwiavcb", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302693.887836380, {}], {"log"=>"lgaxrstavl", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:34] [debug] [out flush] cb_destroy coro_id=10
[2023/06/20 23:11:34] [debug] [task] destroy task=0x7f6a680158c0 (task_id=0)
[2023/06/20 23:11:34] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:34] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:35] [debug] [task] created task=0x7f6a68014ae0 id=0 OK
[2023/06/20 23:11:35] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302694.388522201, {}], {"log"=>"omghhpnwkh", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302694.889260971, {}], {"log"=>"lvwjoyyiiu", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:35] [debug] [out flush] cb_destroy coro_id=11
[2023/06/20 23:11:35] [debug] [task] destroy task=0x7f6a68014ae0 (task_id=0)
[2023/06/20 23:11:35] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:35] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:36] [debug] [task] created task=0x7f6a68014ae0 id=0 OK
[2023/06/20 23:11:36] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302695.389975260, {}], {"log"=>"jhqldnhpsc", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302695.890727701, {}], {"log"=>"rbccjnnsuy", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:36] [debug] [out flush] cb_destroy coro_id=12
[2023/06/20 23:11:36] [debug] [task] destroy task=0x7f6a68014ae0 (task_id=0)
[2023/06/20 23:11:36] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:36] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:37] [debug] [task] created task=0x7f6a68014ae0 id=0 OK
[2023/06/20 23:11:37] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302696.391431990, {}], {"log"=>"tocfeqeimb", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302696.892173635, {}], {"log"=>"ioidqivahk", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:37] [debug] [out flush] cb_destroy coro_id=13
[2023/06/20 23:11:37] [debug] [task] destroy task=0x7f6a68014ae0 (task_id=0)
[2023/06/20 23:11:37] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:37] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:38] [debug] [task] created task=0x7f6a68014ae0 id=0 OK
[2023/06/20 23:11:38] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302697.392976327, {}], {"log"=>"mfkdikyeqk", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302697.893695426, {}], {"log"=>"bfntmmyyfl", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:38] [debug] [out flush] cb_destroy coro_id=14
[2023/06/20 23:11:38] [debug] [task] destroy task=0x7f6a68014ae0 (task_id=0)
[2023/06/20 23:11:38] [debug] [imds] using IMDSv1
[2023/06/20 23:11:38] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:38] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:11:38] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:11:38] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:38] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:39] [debug] [task] created task=0x7f6a680146a0 id=0 OK
[2023/06/20 23:11:39] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302698.394444930, {}], {"log"=>"xvkyupyiaf", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302698.895156811, {}], {"log"=>"tyhmasfucu", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:39] [debug] [out flush] cb_destroy coro_id=15
[2023/06/20 23:11:39] [debug] [task] destroy task=0x7f6a680146a0 (task_id=0)
[2023/06/20 23:11:39] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:39] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:40] [debug] [task] created task=0x7f6a68014e20 id=0 OK
[2023/06/20 23:11:40] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302699.395877353, {}], {"log"=>"smmaqjfvvb", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302699.896595048, {}], {"log"=>"rfapheuxnw", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:40] [debug] [out flush] cb_destroy coro_id=16
[2023/06/20 23:11:40] [debug] [task] destroy task=0x7f6a68014e20 (task_id=0)
[2023/06/20 23:11:40] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:40] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:41] [debug] [task] created task=0x7f6a68014e20 id=0 OK
[2023/06/20 23:11:41] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302700.397177930, {}], {"log"=>"csivjjqtbm", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302700.897929948, {}], {"log"=>"cqbwanmvbo", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:41] [debug] [out flush] cb_destroy coro_id=17
[2023/06/20 23:11:41] [debug] [task] destroy task=0x7f6a68014e20 (task_id=0)
[2023/06/20 23:11:41] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:41] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:42] [debug] [task] created task=0x7f6a68015670 id=0 OK
[0] stdin.0: [[1687302701.398645419, {}], {"log"=>"qrgxcwuvdj", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:42] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[1] stdin.0: [[1687302701.899367671, {}], {"log"=>"jdenrvbrdg", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:42] [debug] [out flush] cb_destroy coro_id=18
[2023/06/20 23:11:42] [debug] [task] destroy task=0x7f6a68015670 (task_id=0)
[2023/06/20 23:11:42] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:42] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:43] [debug] [task] created task=0x7f6a68014db0 id=0 OK
[2023/06/20 23:11:43] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302702.400086133, {}], {"log"=>"xzhrixezzn", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302702.900802870, {}], {"log"=>"tzyohbknky", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:43] [debug] [out flush] cb_destroy coro_id=19
[2023/06/20 23:11:43] [debug] [task] destroy task=0x7f6a68014db0 (task_id=0)
[2023/06/20 23:11:43] [debug] [imds] using IMDSv1
[2023/06/20 23:11:43] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:43] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:11:43] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:11:43] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
^B[2023/06/20 23:11:43] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
n[2023/06/20 23:11:44] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:44] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302703.401526941, {}], {"log"=>"qnszuutxyx", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302703.902202719, {}], {"log"=>"zdhiiabprw", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:44] [debug] [out flush] cb_destroy coro_id=20
[2023/06/20 23:11:44] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:44] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:44] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:45] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:45] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302704.402955918, {}], {"log"=>"fkcsnqpovj", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302704.903687318, {}], {"log"=>"lgvmzbxyqh", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:45] [debug] [out flush] cb_destroy coro_id=21
[2023/06/20 23:11:45] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:45] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:45] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:46] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:46] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302705.404424512, {}], {"log"=>"qrughiphru", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302705.905138969, {}], {"log"=>"qblwbyarlz", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:46] [debug] [out flush] cb_destroy coro_id=22
[2023/06/20 23:11:46] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:46] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:46] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:47] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:47] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302706.405846141, {}], {"log"=>"ibnynivvtl", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302706.906569923, {}], {"log"=>"kxnkkusfuc", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:47] [debug] [out flush] cb_destroy coro_id=23
[2023/06/20 23:11:47] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:47] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:47] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:48] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:48] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302707.407249328, {}], {"log"=>"didbtnzbzi", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302707.907962637, {}], {"log"=>"zwjgiobcwl", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:48] [debug] [out flush] cb_destroy coro_id=24
[2023/06/20 23:11:48] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:48] [debug] [imds] using IMDSv1
[2023/06/20 23:11:48] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:48] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:11:48] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:11:48] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:48] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:49] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:49] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302708.408652963, {}], {"log"=>"wxuaxrdqlv", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302708.909365672, {}], {"log"=>"amjlhcahcd", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:49] [debug] [out flush] cb_destroy coro_id=25
[2023/06/20 23:11:49] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:49] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:49] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:50] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:50] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302709.410108494, {}], {"log"=>"vmqkaqruoz", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302709.910848699, {}], {"log"=>"cdwvfgpdfd", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:50] [debug] [out flush] cb_destroy coro_id=26
[2023/06/20 23:11:50] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:50] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:50] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:51] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:51] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302710.411479728, {}], {"log"=>"natzjrcjph", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302710.912165690, {}], {"log"=>"gphrsbiilv", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:51] [debug] [out flush] cb_destroy coro_id=27
[2023/06/20 23:11:51] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:51] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:51] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:52] [debug] [task] created task=0x7f6a68015110 id=0 OK
[2023/06/20 23:11:52] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302711.412908977, {}], {"log"=>"svxguhdmax", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302711.913619606, {}], {"log"=>"onfnllyvow", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:52] [debug] [out flush] cb_destroy coro_id=28
[2023/06/20 23:11:52] [debug] [task] destroy task=0x7f6a68015110 (task_id=0)
[2023/06/20 23:11:52] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:52] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:11:53] [debug] [task] created task=0x7f6a68015670 id=0 OK
[2023/06/20 23:11:53] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302712.414349521, {}], {"log"=>"pvoxzlsfml", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302712.915064821, {}], {"log"=>"qzhnpqdxxn", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:11:53] [debug] [out flush] cb_destroy coro_id=29
[2023/06/20 23:11:53] [debug] [task] destroy task=0x7f6a68015670 (task_id=0)
[2023/06/20 23:11:53] [debug] [imds] using IMDSv1
[2023/06/20 23:11:53] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:53] [debug] [imds] using IMDSv1
[2023/06/20 23:11:53] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:53] [debug] [imds] using IMDSv1
[2023/06/20 23:11:53] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:11:53] [debug] [filter:aws:aws.0] found tag Name which is included=1
[2023/06/20 23:11:53] [debug] [filter:aws:aws.0] found tag CUSTOMER_ID which is included=1
[2023/06/20 23:11:53] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:11:53] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:11:54] [debug] [task] created task=0x7f6a68021340 id=0 OK
[2023/06/20 23:11:54] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302713.415772286, {}], {"log"=>"dcszivdqsk", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302713.916451286, {}], {"log"=>"cvguhtpksg", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:11:54] [debug] [out flush] cb_destroy coro_id=30
[2023/06/20 23:11:54] [debug] [task] destroy task=0x7f6a68021340 (task_id=0)
[2023/06/20 23:11:54] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:11:54] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:11:55] [debug] [task] created task=0x7f6a68021340 id=0 OK
[2023/06/20 23:11:55] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302714.417153963, {}], {"log"=>"oohllwocqs", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302714.917882541, {}], {"log"=>"dfeewqqtvd", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:11:55] [debug] [out flush] cb_destroy coro_id=31
[2023/06/20 23:11:55] [debug] [task] destroy task=0x7f6a68021340 (task_id=0)
[2023/06/20 23:11:55] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:11:55] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:11:56] [debug] [task] created task=0x7f6a68021340 id=0 OK
[2023/06/20 23:11:56] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302715.418593370, {}], {"log"=>"dzltwikdmr", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302715.919313354, {}], {"log"=>"ookptsgpfk", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:11:56] [debug] [out flush] cb_destroy coro_id=32
[2023/06/20 23:11:56] [debug] [task] destroy task=0x7f6a68021340 (task_id=0)
[2023/06/20 23:11:56] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
^C[2023/06/20 23:11:56] [engine] caught signal (SIGINT)
[2023/06/20 23:11:56] [debug] [task] created task=0x7f6a68021340 id=0 OK
[2023/06/20 23:11:56] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:11:56] [ warn] [engine] service will shutdown in max 5 seconds
[0] stdin.0: [[1687302716.420024849, {}], {"log"=>"zmkzggzhan", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:11:56] [debug] [out flush] cb_destroy coro_id=33
[2023/06/20 23:11:56] [debug] [task] destroy task=0x7f6a68021340 (task_id=0)
[2023/06/20 23:11:56] [ warn] [input:stdin:stdin.0] end of file (stdin closed by remote end)
[2023/06/20 23:11:56] [ warn] [engine] service will shutdown in max 5 seconds
[2023/06/20 23:11:57] [ info] [engine] service has stopped (0 pending tasks)
[2023/06/20 23:11:57] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/06/20 23:11:57] [ info] [output:stdout:stdout.0] thread worker #0 stopped

Valgrind run

$ python3 logs_producer.py | valgrind ./bin/fluent-bit -c ./fluent-bit.conf
...
[2023/06/20 23:13:24] [debug] [out flush] cb_destroy coro_id=24
[2023/06/20 23:13:24] [debug] [task] destroy task=0x78c3b40 (task_id=0)
[2023/06/20 23:13:24] [debug] [imds] using IMDSv1
[2023/06/20 23:13:24] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:13:24] [debug] [aws_client] (null): http_do=0, HTTP Status: 500
[2023/06/20 23:13:24] [error] [filter:aws:aws.0] Failed to get instance EC2 Tags
[2023/06/20 23:13:24] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:24] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[0] stdin.0: [[1687302804.446433952, {}], {"log"=>"sjzkwkkhip", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:25] [debug] [task] created task=0x7950910 id=0 OK
[1] stdin.0: [[1687302804.947079389, {}], {"log"=>"xygfrurbve", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:25] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:25] [debug] [out flush] cb_destroy coro_id=25
[2023/06/20 23:13:25] [debug] [task] destroy task=0x7950910 (task_id=0)
[2023/06/20 23:13:25] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:25] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:26] [debug] [task] created task=0x79d9c10 id=0 OK
[0] stdin.0: [[1687302805.447817812, {}], {"log"=>"lgnlcjzwnl", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:26] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[1] stdin.0: [[1687302805.948130255, {}], {"log"=>"uhwmylemhu", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:26] [debug] [out flush] cb_destroy coro_id=26
[2023/06/20 23:13:26] [debug] [task] destroy task=0x79d9c10 (task_id=0)
[2023/06/20 23:13:26] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:26] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[0] stdin.0: [[1687302806.449137818, {}], {"log"=>"doewwkdtwx", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:27] [debug] [task] created task=0x7a62f10 id=0 OK
[1] stdin.0: [[1687302806.949977808, {}], {"log"=>"rwjwuibenx", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:27] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:27] [debug] [out flush] cb_destroy coro_id=27
[2023/06/20 23:13:27] [debug] [task] destroy task=0x7a62f10 (task_id=0)
[2023/06/20 23:13:27] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:27] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:28] [debug] [task] created task=0x7a9ab60 id=0 OK
[0] stdin.0: [[1687302807.450893848, {}], {"log"=>"pcaczrcqes", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302807.951367397, {}], {"log"=>"kwkmoghwbm", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:28] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:28] [debug] [out flush] cb_destroy coro_id=28
[2023/06/20 23:13:28] [debug] [task] destroy task=0x7a9ab60 (task_id=0)
[2023/06/20 23:13:28] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:28] [debug] [input chunk] update output instances with new chunk size diff=95, records=1, input=stdin.0
[2023/06/20 23:13:29] [debug] [task] created task=0x7b75540 id=0 OK
[0] stdin.0: [[1687302808.451598896, {}], {"log"=>"eiiolurnhv", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[1] stdin.0: [[1687302808.952401873, {}], {"log"=>"afwsrzjqar", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168"}]
[2023/06/20 23:13:29] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:29] [debug] [out flush] cb_destroy coro_id=29
[2023/06/20 23:13:29] [debug] [task] destroy task=0x7b75540 (task_id=0)
[2023/06/20 23:13:29] [debug] [imds] using IMDSv1
[2023/06/20 23:13:29] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:13:29] [debug] [imds] using IMDSv1
[2023/06/20 23:13:29] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:13:29] [debug] [imds] using IMDSv1
[2023/06/20 23:13:29] [debug] [http_client] not using http_proxy for header
[2023/06/20 23:13:29] [debug] [filter:aws:aws.0] found tag Name which is included=1
[2023/06/20 23:13:29] [debug] [filter:aws:aws.0] found tag CUSTOMER_ID which is included=1
[2023/06/20 23:13:29] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:29] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:30] [debug] [task] created task=0x7c0bda0 id=0 OK
[0] stdin.0: [[1687302809.453176773, {}], {"log"=>"qlisjlqwoa", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302809.953764480, {}], {"log"=>"kmjgdegdez", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:13:30] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:30] [debug] [out flush] cb_destroy coro_id=30
[2023/06/20 23:13:30] [debug] [task] destroy task=0x7c0bda0 (task_id=0)
[2023/06/20 23:13:30] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:30] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:31] [debug] [task] created task=0x7c94e20 id=0 OK
[0] stdin.0: [[1687302810.454493939, {}], {"log"=>"bspdspfbst", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302810.955245984, {}], {"log"=>"kfztcwxlds", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:13:31] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:31] [debug] [out flush] cb_destroy coro_id=31
[2023/06/20 23:13:31] [debug] [task] destroy task=0x7c94e20 (task_id=0)
[2023/06/20 23:13:31] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:31] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:32] [debug] [task] created task=0x7d1dea0 id=0 OK
[0] stdin.0: [[1687302811.455955388, {}], {"log"=>"xsipauitvy", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302811.956567417, {}], {"log"=>"dadyiftnpx", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:13:32] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:32] [debug] [out flush] cb_destroy coro_id=32
[2023/06/20 23:13:32] [debug] [task] destroy task=0x7d1dea0 (task_id=0)
[2023/06/20 23:13:32] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:32] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:33] [debug] [task] created task=0x7da6f20 id=0 OK
[0] stdin.0: [[1687302812.457281436, {}], {"log"=>"tkrwsizxrf", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302812.958238344, {}], {"log"=>"rltoogpkzm", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:13:33] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/06/20 23:13:33] [debug] [out flush] cb_destroy coro_id=33
[2023/06/20 23:13:33] [debug] [task] destroy task=0x7da6f20 (task_id=0)
[2023/06/20 23:13:33] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
[2023/06/20 23:13:33] [debug] [input chunk] update output instances with new chunk size diff=175, records=1, input=stdin.0
^C[2023/06/20 23:13:34] [engine] caught signal (SIGINT)
[2023/06/20 23:13:34] [ warn] [input:stdin:stdin.0] end of file (stdin closed by remote end)
[2023/06/20 23:13:34] [debug] [task] created task=0x7e2fff0 id=0 OK
[2023/06/20 23:13:34] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] stdin.0: [[1687302813.458823645, {}], {"log"=>"snbpnvsoom", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[1] stdin.0: [[1687302813.959496394, {}], {"log"=>"zapwsnhzak", "az"=>"us-east-1a", "ec2_instance_id"=>"i-0e66fc7f9809d7168", "Name"=>"mwarzynski-fluentbit-dev", "CUSTOMER_ID"=>"43476f15-4ea1-46ef-895b-47d8f5745ac9"}]
[2023/06/20 23:13:34] [ warn] [engine] service will shutdown in max 5 seconds
[2023/06/20 23:13:34] [ warn] [engine] service will shutdown in max 5 seconds
[2023/06/20 23:13:34] [debug] [out flush] cb_destroy coro_id=34
[2023/06/20 23:13:34] [debug] [task] destroy task=0x7e2fff0 (task_id=0)
[2023/06/20 23:13:34] [ info] [engine] service has stopped (0 pending tasks)
[2023/06/20 23:13:34] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/06/20 23:13:34] [ info] [output:stdout:stdout.0] thread worker #0 stopped
==2648==
==2648== HEAP SUMMARY:
==2648==     in use at exit: 0 bytes in 0 blocks
==2648==   total heap usage: 5,045 allocs, 5,045 frees, 20,241,427 bytes allocated
==2648==
==2648== All heap blocks were freed -- no leaks are possible
==2648==
==2648== For lists of detected and suppressed errors, rerun with: -s
==2648== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@mwarzynski mwarzynski temporarily deployed to pr April 22, 2023 15:26 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr April 22, 2023 15:26 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr April 22, 2023 15:26 — with GitHub Actions Inactive
@mwarzynski mwarzynski changed the title Filter aws ec2 tags handle 500 errors filter: aws ec2 tags handle 500 errors Apr 22, 2023
@mwarzynski mwarzynski changed the title filter: aws ec2 tags handle 500 errors filter: aws ec2 tags handle gracefully 500 errors Apr 22, 2023
@mwarzynski mwarzynski temporarily deployed to pr April 22, 2023 15:46 — with GitHub Actions Inactive
@mwarzynski
Copy link
Contributor Author

@PettitWesley Hey, I wonder if you could find some time to check out this PR. Absolutely no pressure though.

Copy link
Contributor

@PettitWesley PettitWesley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mwarzynski Sorry I think since it is a draft I never got notified for this.

if (ctx->tag_is_enabled[i] == FLB_TRUE) {
ctx->new_keys++;
flb_plg_error(ctx->ins, "Failed to get instance EC2 Tags, will not retry");
if (ret == -3) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From looking at the code myself quickly, it looks like this is only returned if the config is invalid?

So that's the one case we don't retry?

I think I can agree with that.

For other cases, I can think of three options:

  1. Just retry forever on each flush and keep emitting error messages. This ensures that if a user enables tagging or fixes IMDS, Fluent Bit will start working without restart. The downside is that you get a spam of errors which might increase your log cost.
  2. Retry some set number of times that we track with a counter. This could be user configurable.
  3. Retry at some fixed interval which user can configure. So may be by default its 5 minutes or something, so you won't get more than one error message per 5 minutes. You track the last retry time in the code. If the user sets the time to -1, then may be that means no retries, just fail right away.

I like #3 and I think we want to give users the option to Require tags which could be supported with some zero or negative value which means no retries.

#3 ensures we can't spam their logs with errors.

Copy link
Contributor Author

@mwarzynski mwarzynski Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial version of this PR is a 'proposal'. Minimal effort to handle 500s. In reality it is just to start the discussion about the desired solution, but most likely not the solution itself.

From looking at the code myself quickly, it looks like this is only returned if the config is invalid?
So that's the one case we don't retry?

-3 means that configuration is invalid and Fluent Bit will exit with an error code if that happens.

Yes, that's the only case we don't retry -- we simply exit the application if that happens. For all other cases, if fetching tags will fail for any reason other than invalid configuration, we won't retry fetching tags (and all flushed logs won't have the tags injected).


if a user enables tagging or fixes IMDS, Fluent Bit will start working without restart

EC2 option change for instance metadata server and tag values are only refreshed with the EC2 restart, so user would have to restart the instance anyway to have an effect (it doesn't apply for Nitro instances though). I don't have as much knowledge about the IMDS, so it might be one of the cases when it can be fixed without the restart.


  1. Just retry forever on each flush and keep emitting error messages. This ensures that if a user enables tagging or fixes IMDS, Fluent Bit will start working without restart. The downside is that you get a spam of errors which might increase your log cost.

Retrying on each flush might be an overkill, especially since these errors may not change. I admit, it would be the simplest approach, so that's a plus.

In my opinion, we should allow the user to configure the maximum retry rate (with sane defaults provided). I wouldn't worry about the spam of errors as there is really no reasonable alternative. If user would be able to define the maximum retry rate, indirectly it allows to control the amount of error logs. In the end, at best someone will notice these errors and fix the underlying issue.

  1. Retry some set number of time

It is an interesting option, but if an issue would be fixed after the defined max counter of retries, then our Filter wouldn't load the tags. I would rather expect the application to retry indefinitely, but maybe with lower rate if error is persistent. This way, it at least has the chance to fix itself.

  1. Retry at some fixed interval which user can configure. So may be by default its 5 minutes or something

Yeah, I also like 'Option 3'. Let's call this 'Fixed Interval Retries'.
@PettitWesley I will try to implement this option as the next version of this PR.


I may have a few more ideas/suggestions.

NIT: Abstraction Layer

First of all, I don't think we should treat EC2 Tags in a special way. EC2 Tags are only special at one angle, that EC2 must have the instance-metadata-options enabled in order for this functionality to work. In other words, we shouldn't retry 404s for EC2 Tags. All other errors should be treated equally in comparison to fetching other metadata infrmation pieces (like instance id). If we intend to implement a new retries mechanism for EC2 tags, then I would keep in mind that in the future it might be reused for other metadata information groups. That's just to say we should try to provide a decent abstraction layer for the retries, but we will see how it plays out in practice.

Retries with Exponential Backoff

In terms of the retries and proposal for 'Fixed Interval Retries'. Personally I usually consider exponential backoff whenever I attempt to add retries to a component. I wonder if it would provide some value in this case as well. The main advantage of the exponential backoff is that Fluent Bit would fix itself faster in case the underlying issue is short-lived/temporary. It might start with retries at every flush and if it will fail repeatedly, it would adjust the retry duration to the longer periods -- possibly even longer than we would specify for the Fixed Interval Retries as we had sufficient number of retries to suspect that the issue is likely still occurring.

Example where exp backoff would be better:

  1. [t+0s] flush, ec2 tags failed => logs without tags
  2. [t+1s] flush, retry ec2 tags failed => logs without tags
  3. [t+2s] flush, retry ec2 tags success => logs with tags
  4. [t+3s] flush => logs with tags
  5. [t+4s] flush => logs with tags
  6. [t+5s] flush => logs with tags

Whereas in the 'Fixed Interval Retries', assuming a default retry duration set to 5s, the logs pushed at t+{2,3,4}s would have no tags.

Of course, if the issue would be persistent, then retry duration would be adjusted accordingly. It depends on the configuration, but assuming: 0) flush interval = 1s, 1) start_retry_duration = 100ms, 2) exp = 2, 3) max_retry_duration = 15m:

[0.1, // => 1s [min allowed flush duration is 1s, so we get to do something every 1s -- can't retry sooner]
 0.2, // => 1s
 0.4, // => 1s 
 0.8, // => 1s
 1.6, // => 2s
 3.2, // => 4s
 6.4, // => 7s
 12.8, // => 13s
 25.6, // => 26s
 51.2, // => 52s
 102.4, // => 103s = 1m43s
 204.8, // => 205s = 3m25s
 409.6, // => 410s = 6m50s
 819.2, // => 820s = 13m40s
 900, // => 900s = 15m
 900, // => 900s = 15m [max duration]
 ... 

In this example, the first 4 retries would happen every 1s. Then we would wait increasingly longer for the next retries (2 times more every time). At the end, we would wait every 15m for the next retry. Ofc, values are configurable -- this is only the example.

What do you think? Do you see some value in introducing more complexity and exponential backoff with the potential advantage of more logs with injected tags?

Allow to block pushing logs if EC2 Tags weren't fetched

For some use cases (for transparency, mine included), it might be useful to block pushing logs until Filter AWS fetches successfully all information groups. I mean, during the Plugin initialization phase, it would have to fetch every enabled information in the configuration -- not only EC2 tags, but for example also instance id. If Fluent Bit wouldn't be able to fetch correctly all configured datapoints and our special configuration would be enabled, then the init function fails and Fluent Bit should fail too. It would defer the retries to the upper levels like systemd.

The idea is that, the more instances we have, the less useful are logs without tags. At the moment, if Fluent Bit flushes logs without tags, I am not really able to attribute them to particular components and debug the issues accordingly. It might be an edge case and specialised use case, but I would propose to expose an option in the configuration that would fail the Filter AWS initialization if fetching one of the groups failed.


Let me know what you think about my suggestions. As I said, I will try to implement the 'Fixed Interval Retries' without any additional concepts (/ any of my suggestions) as the next version of the PR. However, if you agree with some of my suggestions, then we might think about adding more complexity to the PR.

(Also, sorry for late reply, I somehow missed the notifications from this PR too...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EC2 option change for instance metadata server and tag values are only refreshed with the EC2 restart, so user would have to restart the instance anyway to have an effect (it doesn't apply for Nitro instances though). I don't have as much knowledge about the IMDS, so it might be one of the cases when it can be fixed without the restart.

Ah I see... I was testing on a nitro based instance before... I thought this was how it always works.

I like both your "fixed interval retries" (simpler user config and experience) and exponential backoff retries ideas. Backoff has the downside that as your retry time grows, if you do fix the issue, it takes a while for it to take effect. Also, configuring backoff formula is probably more complicated for the users? What would be your proposed config options? For FLB backoff, we have scheduler.cap and scheduler.base to configure the max retry time and the backoff formula, we could do the same here with sane defaults. https://docs.fluentbit.io/manual/administration/scheduling-and-retries

For some use cases (for transparency, mine included), it might be useful to block pushing logs until Filter AWS fetches successfully all information groups. I mean it would have to fetch every enabled information in the configuration -- not only EC2 tags, but for example also instance id.

May be we handle this with a special zero value for the timeout time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be useful to block pushing logs until Filter AWS fetches successfully all information groups

May be we handle this with a special zero value for the timeout time?

OK, this is a good idea. Do you want me to include this behavior of retries=0 in this PR?

@mwarzynski mwarzynski force-pushed the filter-aws-ec2-tags-handle-500-errors branch from f5f365f to 89990d3 Compare June 18, 2023 02:49
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 02:50 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 02:50 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 02:51 — with GitHub Actions Inactive
@mwarzynski
Copy link
Contributor Author

mwarzynski commented Jun 18, 2023

@PettitWesley I pushed the first version which should satisfy the minimal requirements for 'Fixed Interval Retries' and handle 500s for EC2 tag requests.

I initially set the default value for retries interval to 5s. We can make it longer (e.g. 5m).

I tested my changes locally. In the local run, FluentBit got 2 logs per second in the input, whereas settings had 1s flush interval. Here are logs from the EC2 Metadata Server 'mock':

mwarzynski@6d1934297b3c:/fluent-bit-dev/build-dev$ python3 ec2_server.py
127.0.0.1 - - [18/Jun/2023 02:48:26] "GET /latest/meta-data/instance-id/ HTTP/1.1" 200 -
127.0.0.1 - - [18/Jun/2023 02:48:26] "GET /latest/meta-data/placement/availability-zone/ HTTP/1.1" 200 -
127.0.0.1 - - [18/Jun/2023 02:48:26] "GET /latest/meta-data/tags/instance HTTP/1.1" 500 -
127.0.0.1 - - [18/Jun/2023 02:48:31] "GET /latest/meta-data/tags/instance HTTP/1.1" 500 -
127.0.0.1 - - [18/Jun/2023 02:48:36] "GET /latest/meta-data/tags/instance HTTP/1.1" 500 -
127.0.0.1 - - [18/Jun/2023 02:48:41] "GET /latest/meta-data/tags/instance HTTP/1.1" 500 -
^Cmwarzynski@6d1934297b3c:/fluent-bit-dev/build-dev$ python3 ec2_server.py  # fixed the 500 bug, reload server -- will respond with 200s
127.0.0.1 - - [18/Jun/2023 02:48:46] "GET /latest/meta-data/tags/instance HTTP/1.1" 200 -
127.0.0.1 - - [18/Jun/2023 02:48:46] "GET /latest/meta-data/tags/instance/Name HTTP/1.1" 200 -
127.0.0.1 - - [18/Jun/2023 02:48:46] "GET /latest/meta-data/tags/instance/CUSTOMER_ID HTTP/1.1" 200 -

As you can see, Fluent Bit made requests for EC2 tags every 5s even though flushes where happening every 1s. If there wouldn't be any logs flushed, then the interval would be even longer (until the next flush).
Here are FluentBit logs: https://gist.github.com/mwarzynski/83f9dfc8444fb6f68ba2df68c949ca2e

Please note that I introduced a concept of metadata_group, such that we can separate the concerns and define how critical each of the metadata groups is in the future. At the moment, if anything from the 'base' group fails, then we don't include any successfully fetched information in the log. However, I believe the correct approach would be to separate the concerns and either push whatever we managed to fetch, or block on retries for the metadata groups specified as required in the configuration. For this reason, I added another struct definition flb_aws_filter_metadata_group, such that we can extend the retries strategy and implement more groups in the future.

Let me know what you think about the configuration design for the retries seconds: tags_retry_interval_s. In my opinion, if we would switch later on and allow retries for each metadata group separately, then we want something more general, e.g. simply retry_interval_s. tags_retry_interval_s is too specific. If we will add a separate metadata group for instance id, az, ami, and so on, then creating separate config options for each metadata group will be an overkill.

backoff formula is probably more complicated

Yeah, it is. I would stick with a simpler solution for the first iteration. I can propose the backoff exp in the next PRs (if we think it's worth it).

For some use cases (for transparency, mine included), it might be useful to block pushing logs until Filter AWS fetches successfully all information groups.
May be we handle this with a special zero value for the timeout time?

I think we should make the configuration more explicit. In case someone wants to require EC2 Tags in the logs, we can add another configuration called required tags (with `` as a default). If required tags, then we wouldn't allow to flush logs until EC2 tags are successfully fetched (or disabled == 404). Such configuration design would also make it easily extendible in the future (e.g. `required tags,instance_id` or `required *` which would require all enabled groups).

What do you think?

@mwarzynski mwarzynski requested a review from PettitWesley June 18, 2023 03:06
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 03:16 — with GitHub Actions Inactive
@mwarzynski mwarzynski force-pushed the filter-aws-ec2-tags-handle-500-errors branch from 89990d3 to 1f9e707 Compare June 18, 2023 03:29
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 03:30 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 03:30 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 03:30 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr June 18, 2023 03:50 — with GitHub Actions Inactive
@PettitWesley
Copy link
Contributor

@mwarzynski I like the idea of creating a "required" tag group which causes the filter to fail to send logs unless certain metadata can be fetched.

For some use cases (for transparency, mine included), it might be useful to block pushing logs until Filter AWS fetches successfully all information groups.

Would you be okay with losing the logs instead? That sentence should read "unless" not "until". Because that's what will happen if the filter won't pass through any logs unless a value can be fetched.

I think that's probably a very niche use case?

@mwarzynski
Copy link
Contributor Author

mwarzynski commented Jun 19, 2023

Would you be okay with losing the logs instead? That sentence should read "unless" not "until". Because that's what will happen if the filter won't pass through any logs unless a value can be fetched.

@PettitWesley In my personal opinion, I would rather submit logs without the necessary information than lose them altogether. Of course, this decision may vary depending on the specific use case. However, I likely would not be comfortable with losing the logs completely. It is preferable to have the logs without certain metadata than to lose them entirely.

Isn't it dependent on the type of input? In other words, inputs can vary in terms of source persistence:

  1. Non-persistent inputs, such as stdout from a command execution.
  2. Persistent inputs, like using tail on a log file (assuming that file+offset.db is stored on a persistent volume).

I would assume that in the scenario where required metadata is not fetched, the persistent input logs (2.) would not be lost. At the same time, Filter probably has no reliable way to distinguish source persistence, so given that we are not comfortable with losing the non-persistent logs, it might not be productive to introduce the required option to filters.

I think that's probably a very niche use case?

It most likely is. Furthermore, I would like to emphasize that it would address a specific scenario that is not expected to occur frequently, if at all.

@PettitWesley
Copy link
Contributor

@mwarzynski

Persistent inputs, like using tail on a log file (assuming that file+offset.db is stored on a persistent volume).

So tail input keeps a file offset position. Once it reads the logs, then it advances offset. The tail input then considers the data read. If a filter then drops it all, the tail input still considers it read.

Take a look at code in flb_filter.c in flb_filter_do.

{
int ret;

ctx->metadata_groups[FLB_FILTER_AWS_METADATA_GROUP_TAGS].last_execution = time(NULL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: last_fetch or last_attempt I feel makes more sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I renamed this field to last_fetch_attempt.

@@ -1048,6 +1114,12 @@ static struct flb_config_map config_map[] = {
"if both tags_include and tags_exclude are specified, configuration is invalid"
" and plugin fails"
},
{
FLB_CONFIG_MAP_INT, "tags_retry_interval_s", "5",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should pick a default that's higher than 5? That's 12 calls per minute, which is a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the default value to 300 (5m).

@mwarzynski mwarzynski temporarily deployed to pr June 20, 2023 22:55 — with GitHub Actions Inactive
@mwarzynski mwarzynski temporarily deployed to pr June 20, 2023 22:55 — with GitHub Actions Inactive
Signed-off-by: Mateusz Warzyński <[email protected]>
group_tag and group_base instead of array of groups

Signed-off-by: Mateusz Warzyński <[email protected]>
add test cases for injecting following information:
 - instance_id
 - instance_type
 - private_ip
 - vpc_id
 - ami_id
 - account_id
 - hostname
 - az

Signed-off-by: Mateusz Warzyński <[email protected]>
'retry_interval_s' is specified in the filter aws configuration

Signed-off-by: Mateusz Warzyński <[email protected]>
@mwarzynski
Copy link
Contributor Author

mwarzynski commented Mar 15, 2024

I rebased my changes on the latest origin:master. It compiled successfully on my machine.

...
[ 89%] Linking C executable ../bin/fluent-bit
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_mp.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_kv.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_api.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_csv.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_lib.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_log.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_env.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_file.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_uri.c.o
[ 89%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_hash_table.c.o
[ 90%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_help.c.o
[ 90%] Building C object src/CMakeFiles/fluent-bit-shared.dir/flb_pack.c.o
[ 90%] Built target fluent-bit-bin
...
Scanning dependencies of target hello_world
Scanning dependencies of target out_lib
[ 98%] Building C object examples/hello_world/CMakeFiles/hello_world.dir/hello_world.c.o
[ 98%] Building C object examples/out_lib/CMakeFiles/out_lib.dir/out_lib.c.o
[ 98%] Building C object examples/hello_world/CMakeFiles/hello_world.dir/__/__/lib/lwrb/lwrb/src/lwrb/lwrb.c.o
[100%] Building C object examples/out_lib/CMakeFiles/out_lib.dir/__/__/lib/lwrb/lwrb/src/lwrb/lwrb.c.o
[100%] Linking C executable ../../bin/hello_world
[100%] Linking C executable ../../bin/out_lib
[100%] Built target hello_world
[100%] Built target out_lib

@PettitWesley
Copy link
Contributor

I used the config that you have in the description and got a single attempt to request tags, which fails, since its not enabled, then it proceeded adding other metadata to logs:

[2024/03/18 22:21:41] [debug] [http_client] server 169.254.169.254:80 will close connection #23
[2024/03/18 22:21:41] [debug] [aws_client] (null): http_do=0, HTTP Status: 404
[2024/03/18 22:21:41] [debug] [imds] metadata request failure response
<?xml version="1.0" encoding="iso-8859-1"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
		 "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
 <head>
  <title>404 - Not Found</title>
 </head>
 <body>
  <h1>404 - Not Found</h1>
 </body>
</html>

[2024/03/18 22:21:41] [ warn] [filter:aws:aws.0] EC2 instance metadata tag request returned 404. This likely indicates your instance has no tags or the EC2 tagging metadata API is not enabled
[2024/03/18 22:21:41] [debug] [stdout:stdout.0] created event channels: read=23 write=24
[2024/03/18 22:21:41] [ info] [output:stdout:stdout.0] worker #0 started
[2024/03/18 22:21:41] [ info] [sp] stream processor started
[2024/03/18 22:21:42] [debug] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[2024/03/18 22:21:43] [debug] [task] created task=0x7f4db001f5c0 id=0 OK
[2024/03/18 22:21:43] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2024/03/18 22:21:43] [debug] [input chunk] update output instances with new chunk size diff=94, records=1, input=dummy.0
[0] dummy.0: [[1710800502.687835217, {}], {"message"=>"dummy", "az"=>"us-west-2a", "ec2_instance_id"=>"i-0385871b581452acd"}]

It didn't retry ever again for the tags and get a failed response again from what I can tell. I think I'm not understanding something here. I configured the retry_interval_s 5.

metadata_fetched = FLB_FALSE;
}

if (metadata_fetched) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Eduardo says to always use if (metadata_fetched == FLB_TRUE) as FLB code style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PettitWesley
Copy link
Contributor

I think I understand. IMDS returns 404 when tagging is not enabled, which counts as a non-retryable error.

I think this PR and code is fine as is. However, remember the nitro case you brought up. Where a user can enable tags while FLB is running without restarting the instance.

I think those users are the ones who might want this retry feature with a retry_interval_s. Whereas, I am not sure how or why users not on nitro would want the retry interval.

AFAICT, there are two features here:

  1. Refactoring to separate metadata groups so that one can fail and the others succeed and the plugin still will add what metadata it has an continue. <-- I ship this part of it
  2. Adding a retry_interval_s config option. I still do not see the use case for this. When will a user configure it? Could we just have some default value for now? I'm wondering if nitro users would want this, and would wnat to enable retries by setting the interval, whereas other users might want no retries and thus would want a default no_retries setting or some option.

What do you think?

My initial version of this PR is a 'proposal'. Minimal effort to handle 500s.

So if we ship this as is, the use case is that it will retry if IMDS returns a 500?

I think that's very useful and thank you for adding it. I am not sure if it justifies a new config option. I suppose configurability is better than nothing, but I am struggling to imagine how users pick their desired value. What impacts the value you want? Why did you choose 5s as the default?

@mwarzynski
Copy link
Contributor Author

@PettitWesley Hey! Thank you for checking it out.

I think I understand. IMDS returns 404 when tagging is not enabled, which counts as a non-retryable error.

Yes, correct. Fluent Bit also produced a warning about this case (taken from your logs):

[2024/03/18 22:21:41] [ warn] [filter:aws:aws.0] EC2 instance metadata tag request returned 404. This likely indicates your instance has no tags or the EC2 tagging metadata API is not enabled

AFAICT, there are two features here:

Yes. From the Fluent Bit's behaviour perspective, there are two main goals/deliverables with this PR:

  1. attach as many aws metadata groups as Fluent Bit is able to fetch (i.e. do not break on first metadata group fetch error),
  2. retries for failed metadata group fetches during consecutive flushes,

One of value propositions here, as perceived by myself, would be to handle errors related to fetching aws metadata groups. I had experienced issues in the past with our 'in-house component' due to EC2 instances not being able to connect to 'aws metadata server'. This was difficult to debug, because all errors concluded with message that aws metadata server is 'not reachable'. Ultimately we didn't find the root cause of this issue. We do have a bug in the backlog and are monitoring if this will happen again. (We didn't find anything helpful for troubleshooting in the AWS documentation, but maybe we missed something.) To be honest, this PR would probably not help us to avoid this bug as retries would still consistently hit the 'not reachable' error. It doesn't imply though this PR won't help in some other cases, as our component might have automatically recovered from due to the 'retry strategy' already in place, so that's why I might not know about such situations...

In general, I believe there might be various technical issues while performing fetches for aws metadata group. Therefore adding simple retries to mitigate these errors seems like a good solution.

What do you think?

I assume your questions was regarding the following topics. (If there are additional things you meant, feel free to ask.)

  1. Should 'retry' be enabled for all use cases?

I believe retries should be enabled by default for all use cases (not only Nitro EC2 instances, but also usual EC2 instances). It's the same line of thinking: there might be an issue with fetching metadata groups => it would be nice to automatically retry fetching the metadata groups and ensure we do our best to push enriched logs.

Retry attempts are made worst case at every flush, so even if retries will fail at each attempt, there should be no noticeable performance effects overall. (Well, okay, it's a hypothesis, we might confirm this via a real world test.) I struggle to see a good argument in favor of disabling the retries.

  1. Should 'retry_interval_s' be configurable by the end user?

You can make a decision.

Introducing complexity in a popular project isn't the same as doing so in a hobby project. You will have to maintain this config options in the future (and take care about the communication with the users). I guess this means we want to add a new option only if we absolutely need to.

  1. What value for 'retry_interval_s' would make sense for EC2 Nitro users?

Note that current PR does not support the use case of dynamic changes in the aws metadata groups values.
For instance, if Fluent Bit fetches all metadata groups correctly at the first try, it won't attempt to fetch them again -- metadata groups won't be refreshed even if user will change them for the Nitro instance.

Let's imagine that we actually have such solution of refreshing the aws metadata groups (tags) in place for Nitro EC2 instances.

Unfortunately no hard assumption can be made when Fluent Bit (FB) will refresh the log labels (as those refreshes may fail). My line of thinking is: in order to ensure logs always have expected labels, the user's solution must make the components running inside the EC2 Nitro instance produce logs with those expected critical labels. In other words, the design for adding labels to the logs would be such that Nitro 'dynamic' tags are not 'critical' to the observability of the solution (those labels are only 'nice to have'). This implies that re-fetches of dynamic logs could be done on the order of minutes, so my proposal for this case would be 1m as the default.


So if we ship this as is, the use case is that it will retry if IMDS returns a 500?

Yes, it will retry for 500 errors, 'not reachable' errors, or anything that's not expected.

Why did you choose 5s as the default?

I picked 5s as a rule of thumb. Looks good based on my intuition. There is no cleverness behind / it might be stupid. Basically, I didn't want to do it at every flush(=1s), as in case of persisting errors for fetching metadata groups, Fluent Bit would produce a lot of error logs. 5x less is 5s. This logic is obviously flawed as users can set a different flush period or may not be sensitive to the amount of error logs produced by Fluent Bit.

So, what decision do we want to make? Do we remove the config option for retry_interval_s and hardcode it by default to 5s (or 10s/1s)?


Well, this comment is pretty long. If you prefer me to be more concise, just let me know. (After all, those LLMs might be useful for something).

@PettitWesley PettitWesley merged commit 41ef155 into fluent:master May 3, 2024
42 checks passed
@PettitWesley
Copy link
Contributor

@mwarzynski I've merged this into master.

@mwarzynski mwarzynski deleted the filter-aws-ec2-tags-handle-500-errors branch May 4, 2024 00:16
@mwarzynski
Copy link
Contributor Author

Thank you!

@PettitWesley Do we want to adjust the documentation of the config parameters? fluent/fluent-bit-docs#1138

@PettitWesley
Copy link
Contributor

@mwarzynski we must wait to merge the docs until this is released. Please watch the release notes or tags for a new one to be cut.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants