Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kafka: Make back pressure workable and add a mechanism of polling threshold #8174

Merged

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Nov 14, 2023

We need to register pause/resume callback for correct handling of back pressure in in_kafka.
Back pressure implementation of Fluent Bit is relying on pause/resume callback which is registered in the each of plugins.
Also, the current implementation of in_kafka does not consider the threshold to halt polling after reaching the limit of chunks.
It causes always over limit status of chunks. This shouldn't happen if the parameters are correctly set up.
I implemented restrictive polling in in_kafka. This shouldn't cause spikes of memory consumption.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[INPUT]
        Name        kafka
        Brokers     127.0.0.1:9092
        Topics      test
        rdkafka.auto.offset.reset earliest
        # To increase flow rate, specify larger value into buffer_max_size
        # buffer_max_size 5M
  • Debug log output from testing the change
    The above config is effective to restrict polling records at once:
<snip>
[2023/11/15 19:01:56] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:01:56] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:01:56] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:01:56] [debug] [input chunk] update output instances with new chunk size diff=5000544, records=2328, input=kafka.0
[2023/11/15 19:01:56] [ warn] [input] kafka.0 paused (mem buf overlimit)
[2023/11/15 19:01:56] [ info] [input] pausing kafka.0
[2023/11/15 19:01:57] [debug] [task] created task=0x7f5db8071fc0 id=0 OK
[2023/11/15 19:01:57] [debug] [output:splunk:splunk.0] task_id=0 assigned to thread #1
[2023/11/15 19:01:57] [debug] [upstream] KA connection #73 to 127.0.0.1:8443 has been assigned (recycled)
[2023/11/15 19:01:57] [debug] [http_client] not using http_proxy for header
[2023/11/15 19:01:57] [debug] [upstream] KA connection #73 to 127.0.0.1:8443 is now available
[2023/11/15 19:01:57] [debug] [out flush] cb_destroy coro_id=20
[2023/11/15 19:01:57] [debug] [task] destroy task=0x7f5db8071fc0 (task_id=0)
[2023/11/15 19:01:57] [ info] [input] kafka.0 resume (mem buf overlimit)
<snip>

Also, when specifying buffer_chunk_size, this parameter is effective to restrict polling events at once:

<snip>
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input:kafka:kafka.0] kafka message received
[2023/11/15 19:05:05] [debug] [input chunk] update output instances with new chunk size diff=5017600, records=2328, input=kafka.0
[2023/11/15 19:05:05] [debug] [task] destroy task=0x7faa6407f2e0 (task_id=0)
<snip>
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

fluent/fluent-bit-docs#1257

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 changed the title in_kafka: Register pause/resume callbacks to handle back pressure correctly in_kafka: Handle mem_buf_limit correctly and add buffer_chunk_size parameter to provide polling size of threshold Nov 15, 2023
@cosmo0920 cosmo0920 marked this pull request as ready for review November 15, 2023 10:21
@cosmo0920 cosmo0920 changed the title in_kafka: Handle mem_buf_limit correctly and add buffer_chunk_size parameter to provide polling size of threshold in_kafka: Make back pressure workable and add a mechanism of polling threshold Nov 15, 2023
…g topics

Referring Mem_Buf_Limit and relying on pause/resume should cause
performance degredation.
Only relying on buffer_max_size should be better to adjust flow rate.

Signed-off-by: Hiroshi Hatake <[email protected]>
@cosmo0920 cosmo0920 force-pushed the cosmo0920-register-pause-and-resume-callbacks-on-in_kafka branch from 5f18744 to 6ff7f73 Compare November 17, 2023 11:32
@edsiper edsiper merged commit ac071a8 into master Dec 19, 2023
48 of 49 checks passed
@edsiper edsiper deleted the cosmo0920-register-pause-and-resume-callbacks-on-in_kafka branch December 19, 2023 19:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants