Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_forward: Recreate connection when resumed #9605

Merged

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Nov 18, 2024

When in_forward is paused, the remaining connections are discarded and disposed.
The status check should be aligned and protected with mutex lock.
This causes SEGV when resuming from the pause status of in_forward.

This is because in_fw_collect checks the state of ctx->is_paused. And is_paused is FLB_TRUE, in_forward plugins just gives up to accept tcp connections.

I also added to prevent thread switching during the checking status of pausing/resuming.

Closes #9443.
Closes #9288.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[INPUT]
    Name                forward
    Listen              127.0.0.1
    Port                24224
    Buffer_Chunk_Size   512KB
    Buffer_Max_Size     2MB
    Mem_Buf_Limit       10MB

[OUTPUT]
    Name null

If you have a capability to use Fluentd for causing flood of messages constantly exceeding the limit of max size for chunks,
you can use the following configuration:

<system>
  workers 7
</system>
<source>
  @type sample
  tag test
  sample "{\"message\":\"#{'hello' * 1024}\"}"
  rate 370
</source>

<match test>
  @type forward

  <server>
    # first server
    host 127.0.0.1
    port 24224
  </server>

  flush_interval 0
  send_timeout 60
  heartbeat_type udp
  heartbeat_interval 1
  recover_wait 10
  hard_timeout 60
  expire_dns_cache nil
  phi_threshold 16
  phi_failure_detector true
  require_ack_response true
</match>
  • Debug log output from testing the change

With this patch, we can resume properly from the paused status:

<snip>
[2024/12/09 20:35:19] [ info] [output:null:null.0] worker #0 started
[2024/12/09 20:35:25] [debug] [task] created task=0x79068001db50 id=0 OK
[2024/12/09 20:35:25] [debug] [task] created task=0x79068001dc20 id=1 OK
[2024/12/09 20:35:25] [debug] [task] created task=0x79068001dd50 id=2 OK
[2024/12/09 20:35:25] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:25] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:25] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:25] [debug] [output:null:null.0] discarding 2854365 bytes
[2024/12/09 20:35:25] [debug] [output:null:null.0] discarding 2854365 bytes
[2024/12/09 20:35:25] [debug] [out flush] cb_destroy coro_id=0
[2024/12/09 20:35:25] [debug] [out flush] cb_destroy coro_id=1
[2024/12/09 20:35:25] [debug] [output:null:null.0] discarding 951455 bytes
[2024/12/09 20:35:25] [debug] [out flush] cb_destroy coro_id=2
[2024/12/09 20:35:25] [debug] [task] destroy task=0x79068001db50 (task_id=0)
[2024/12/09 20:35:25] [debug] [task] destroy task=0x79068001dc20 (task_id=1)
[2024/12/09 20:35:25] [debug] [task] destroy task=0x79068001dd50 (task_id=2)
[2024/12/09 20:35:26] [ warn] [input] forward.0 paused (mem buf overlimit)
[2024/12/09 20:35:26] [ info] [input] pausing forward.0
[2024/12/09 20:35:26] [debug] [input chunk] forward.0 is paused, cannot append records
[2024/12/09 20:35:26] [error] [input:forward:forward.0] could not append logs. ret=-1
[2024/12/09 20:35:26] [debug] [task] created task=0x79068001d6f0 id=0 OK
[2024/12/09 20:35:26] [debug] [task] created task=0x79068001d7c0 id=1 OK
[2024/12/09 20:35:26] [debug] [task] created task=0x79068001ccd0 id=2 OK
[2024/12/09 20:35:26] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:26] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:26] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:26] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:26] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:26] [debug] [out flush] cb_destroy coro_id=3
[2024/12/09 20:35:26] [debug] [out flush] cb_destroy coro_id=4
[2024/12/09 20:35:26] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:26] [debug] [out flush] cb_destroy coro_id=5
[2024/12/09 20:35:26] [debug] [task] destroy task=0x79068001d6f0 (task_id=0)
[2024/12/09 20:35:26] [ info] [input] resume forward.0
[2024/12/09 20:35:26] [ info] [input] forward.0 resume (mem buf overlimit)
[2024/12/09 20:35:26] [debug] [task] destroy task=0x79068001d7c0 (task_id=1)
[2024/12/09 20:35:26] [debug] [task] destroy task=0x79068001ccd0 (task_id=2)
[2024/12/09 20:35:27] [ warn] [input] forward.0 paused (mem buf overlimit)
[2024/12/09 20:35:27] [ info] [input] pausing forward.0
[2024/12/09 20:35:27] [debug] [task] created task=0x79068137f5c0 id=0 OK
[2024/12/09 20:35:27] [debug] [task] created task=0x79068137f660 id=1 OK
[2024/12/09 20:35:27] [debug] [task] created task=0x79068137f730 id=2 OK
[2024/12/09 20:35:27] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:27] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:27] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:27] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:27] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:27] [debug] [out flush] cb_destroy coro_id=6
[2024/12/09 20:35:27] [debug] [out flush] cb_destroy coro_id=7
<snip>

Full log:

Fluent Bit v3.2.3
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  _____ 
|  ___| |                | |   | ___ (_) |         |____ |/ __  \
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`' / /'
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \  / /  
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /./ /___
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)_____/


[2024/12/09 20:35:41] [ info] Configuration:
[2024/12/09 20:35:41] [ info]  flush time     | 1.000000 seconds
[2024/12/09 20:35:41] [ info]  grace          | 5 seconds
[2024/12/09 20:35:41] [ info]  daemon         | 0
[2024/12/09 20:35:41] [ info] ___________
[2024/12/09 20:35:41] [ info]  inputs:
[2024/12/09 20:35:41] [ info]      forward
[2024/12/09 20:35:41] [ info] ___________
[2024/12/09 20:35:41] [ info]  filters:
[2024/12/09 20:35:41] [ info] ___________
[2024/12/09 20:35:41] [ info]  outputs:
[2024/12/09 20:35:41] [ info]      null.0
[2024/12/09 20:35:41] [ info] ___________
[2024/12/09 20:35:41] [ info]  collectors:
[2024/12/09 20:35:41] [ info] [fluent bit] version=3.2.3, commit=65c5bc20af, pid=284071
[2024/12/09 20:35:41] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/12/09 20:35:41] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/12/09 20:35:41] [ info] [simd    ] disabled
[2024/12/09 20:35:41] [ info] [cmetrics] version=0.9.9
[2024/12/09 20:35:41] [ info] [ctraces ] version=0.5.7
[2024/12/09 20:35:41] [ info] [input:forward:forward.0] initializing
[2024/12/09 20:35:41] [ info] [input:forward:forward.0] storage_strategy='memory' (memory only)
[2024/12/09 20:35:41] [debug] [forward:forward.0] created event channels: read=25 write=26
[2024/12/09 20:35:41] [debug] [in_fw] Listen='127.0.0.1' TCP_Port=24224
[2024/12/09 20:35:41] [debug] [downstream] listening on 127.0.0.1:24224
[2024/12/09 20:35:41] [ info] [input:forward:forward.0] listening on 127.0.0.1:24224
[2024/12/09 20:35:41] [debug] [null:null.0] created event channels: read=28 write=29
[2024/12/09 20:35:41] [ info] [sp] stream processor started
[2024/12/09 20:35:41] [ info] [output:null:null.0] worker #0 started
[2024/12/09 20:35:45] [debug] [task] created task=0x5eaa8b0 id=0 OK
[2024/12/09 20:35:45] [debug] [task] created task=0x5eab070 id=1 OK
[2024/12/09 20:35:45] [debug] [task] created task=0x5eab830 id=2 OK
[2024/12/09 20:35:45] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:45] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:45] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:45] [debug] [output:null:null.0] discarding 2854365 bytes
[2024/12/09 20:35:45] [debug] [output:null:null.0] discarding 2854365 bytes
[2024/12/09 20:35:45] [debug] [out flush] cb_destroy coro_id=0
[2024/12/09 20:35:45] [debug] [out flush] cb_destroy coro_id=1
[2024/12/09 20:35:45] [debug] [output:null:null.0] discarding 1902910 bytes
[2024/12/09 20:35:45] [debug] [out flush] cb_destroy coro_id=2
[2024/12/09 20:35:45] [debug] [task] destroy task=0x5eaa8b0 (task_id=0)
[2024/12/09 20:35:45] [debug] [task] destroy task=0x5eab070 (task_id=1)
[2024/12/09 20:35:45] [debug] [task] destroy task=0x5eab830 (task_id=2)
[2024/12/09 20:35:46] [debug] [task] created task=0x7aa6020 id=0 OK
[2024/12/09 20:35:46] [debug] [task] created task=0x5eaa8b0 id=1 OK
[2024/12/09 20:35:46] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:46] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:46] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:46] [ warn] [input] forward.0 paused (mem buf overlimit)
[2024/12/09 20:35:46] [ info] [input] pausing forward.0
[2024/12/09 20:35:46] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:46] [debug] [out flush] cb_destroy coro_id=3
[2024/12/09 20:35:46] [debug] [out flush] cb_destroy coro_id=4
[2024/12/09 20:35:46] [debug] [task] destroy task=0x7aa6020 (task_id=0)
[2024/12/09 20:35:46] [ info] [input] resume forward.0
[2024/12/09 20:35:46] [ info] [input] forward.0 resume (mem buf overlimit)
[2024/12/09 20:35:46] [debug] [task] destroy task=0x5eaa8b0 (task_id=1)
[2024/12/09 20:35:47] [ warn] [input] forward.0 paused (mem buf overlimit)
[2024/12/09 20:35:47] [ info] [input] pausing forward.0
[2024/12/09 20:35:47] [debug] [input chunk] forward.0 is paused, cannot append records
[2024/12/09 20:35:47] [error] [input:forward:forward.0] could not append logs. ret=-1
[2024/12/09 20:35:47] [debug] [task] created task=0x5eaa8b0 id=0 OK
[2024/12/09 20:35:47] [debug] [task] created task=0x5eab830 id=1 OK
[2024/12/09 20:35:47] [debug] [task] created task=0x5eac750 id=2 OK
[2024/12/09 20:35:47] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:47] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:47] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:47] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:47] [debug] [input chunk] forward.0 is paused, cannot append records
[2024/12/09 20:35:47] [error] [input:forward:forward.0] could not append logs. ret=-1
[2024/12/09 20:35:47] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:47] [debug] [out flush] cb_destroy coro_id=5
[2024/12/09 20:35:47] [debug] [out flush] cb_destroy coro_id=6
[2024/12/09 20:35:47] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:47] [debug] [input chunk] forward.0 is paused, cannot append records
[2024/12/09 20:35:47] [debug] [out flush] cb_destroy coro_id=7
[2024/12/09 20:35:47] [error] [input:forward:forward.0] could not append logs. ret=-1
[2024/12/09 20:35:47] [debug] [task] destroy task=0x5eaa8b0 (task_id=0)
[2024/12/09 20:35:47] [ info] [input] resume forward.0
[2024/12/09 20:35:47] [ info] [input] forward.0 resume (mem buf overlimit)
[2024/12/09 20:35:47] [debug] [task] destroy task=0x5eab830 (task_id=1)
[2024/12/09 20:35:47] [debug] [task] destroy task=0x5eac750 (task_id=2)
[2024/12/09 20:35:48] [debug] [task] created task=0x5eb80d0 id=0 OK
[2024/12/09 20:35:48] [debug] [task] created task=0x5eaa8b0 id=1 OK
[2024/12/09 20:35:48] [debug] [task] created task=0x5eab070 id=2 OK
[2024/12/09 20:35:48] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:48] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:48] [debug] [out flush] cb_destroy coro_id=8
[2024/12/09 20:35:48] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:48] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:48] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:48] [debug] [out flush] cb_destroy coro_id=9
[2024/12/09 20:35:48] [debug] [task] destroy task=0x5eb80d0 (task_id=0)
[2024/12/09 20:35:48] [debug] [output:null:null.0] discarding 1902910 bytes
[2024/12/09 20:35:48] [debug] [task] destroy task=0x5eaa8b0 (task_id=1)
[2024/12/09 20:35:48] [debug] [out flush] cb_destroy coro_id=10
[2024/12/09 20:35:48] [debug] [task] destroy task=0x5eab070 (task_id=2)
[2024/12/09 20:35:49] [debug] [task] created task=0x5eab830 id=0 OK
[2024/12/09 20:35:49] [debug] [task] created task=0x5eaa8b0 id=1 OK
[2024/12/09 20:35:49] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:49] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:49] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:49] [debug] [output:null:null.0] discarding 3615529 bytes
[2024/12/09 20:35:49] [debug] [out flush] cb_destroy coro_id=11
[2024/12/09 20:35:49] [debug] [out flush] cb_destroy coro_id=12
[2024/12/09 20:35:49] [debug] [task] destroy task=0x5eab830 (task_id=0)
[2024/12/09 20:35:49] [debug] [task] destroy task=0x5eaa8b0 (task_id=1)
[2024/12/09 20:35:50] [debug] [task] created task=0x5eac750 id=0 OK
[2024/12/09 20:35:50] [debug] [task] created task=0x5eaa8b0 id=1 OK
[2024/12/09 20:35:50] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:50] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:50] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:50] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:50] [debug] [out flush] cb_destroy coro_id=13
[2024/12/09 20:35:50] [debug] [task] destroy task=0x5eac750 (task_id=0)
[2024/12/09 20:35:50] [debug] [out flush] cb_destroy coro_id=14
[2024/12/09 20:35:50] [debug] [task] destroy task=0x5eaa8b0 (task_id=1)
[2024/12/09 20:35:51] [debug] [task] created task=0x5eac750 id=0 OK
[2024/12/09 20:35:51] [debug] [task] created task=0x5eaa8b0 id=1 OK
[2024/12/09 20:35:51] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:51] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:51] [debug] [output:null:null.0] discarding 3425238 bytes
[2024/12/09 20:35:51] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:51] [debug] [out flush] cb_destroy coro_id=15
[2024/12/09 20:35:51] [debug] [out flush] cb_destroy coro_id=16
[2024/12/09 20:35:51] [debug] [task] destroy task=0x5eac750 (task_id=0)
[2024/12/09 20:35:51] [debug] [task] destroy task=0x5eaa8b0 (task_id=1)
[2024/12/09 20:35:52] [debug] [task] created task=0x5eaa8b0 id=0 OK
[2024/12/09 20:35:52] [debug] [task] created task=0x5eab830 id=1 OK
[2024/12/09 20:35:52] [debug] [task] created task=0x5eac750 id=2 OK
[2024/12/09 20:35:52] [debug] [output:null:null.0] task_id=0 assigned to thread #0
[2024/12/09 20:35:52] [debug] [output:null:null.0] discarding 3805820 bytes
[2024/12/09 20:35:52] [debug] [output:null:null.0] task_id=1 assigned to thread #0
[2024/12/09 20:35:52] [debug] [output:null:null.0] task_id=2 assigned to thread #0
[2024/12/09 20:35:52] [debug] [output:null:null.0] discarding 2473783 bytes
[2024/12/09 20:35:52] [debug] [out flush] cb_destroy coro_id=17
[2024/12/09 20:35:52] [debug] [out flush] cb_destroy coro_id=18
[2024/12/09 20:35:52] [debug] [output:null:null.0] discarding 2664074 bytes
[2024/12/09 20:35:52] [debug] [out flush] cb_destroy coro_id=19
[2024/12/09 20:35:52] [debug] [task] destroy task=0x5eaa8b0 (task_id=0)
[2024/12/09 20:35:52] [debug] [task] destroy task=0x5eab830 (task_id=1)
[2024/12/09 20:35:52] [debug] [task] destroy task=0x5eac750 (task_id=2)
^C[2024/12/09 20:35:52] [engine] caught signal (SIGINT)
[2024/12/09 20:35:52] [ warn] [engine] service will shutdown in max 5 seconds
[2024/12/09 20:35:52] [ info] [input] pausing forward.0
[2024/12/09 20:35:53] [ info] [engine] service has stopped (0 pending tasks)
[2024/12/09 20:35:53] [ info] [input] pausing forward.0
[2024/12/09 20:35:53] [ info] [output:null:null.0] thread worker #0 stopping...
[2024/12/09 20:35:53] [ info] [output:null:null.0] thread worker #0 stopped
  • Attached Valgrind output that shows no leaks or memory corruption was found
==284071== 
==284071== HEAP SUMMARY:
==284071==     in use at exit: 0 bytes in 0 blocks
==284071==   total heap usage: 7,162 allocs, 7,162 frees, 741,887,031 bytes allocated
==284071== 
==284071== All heap blocks were freed -- no leaks are possible
==284071== 
==284071== For lists of detected and suppressed errors, rerun with: -s
==284071== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 marked this pull request as ready for review November 19, 2024 06:51
@cosmo0920 cosmo0920 added this to the Fluent Bit v3.2.2 milestone Nov 20, 2024
@edsiper edsiper merged commit dee6d12 into master Dec 13, 2024
52 checks passed
@edsiper edsiper deleted the cosmo0920-in_forward-recreate-connection-on-in_forward branch December 13, 2024 01:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants