Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: refactoring. simplify msgpack loop. #7380

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

nokute78
Copy link
Collaborator

This patch is updated #6832 for v2.1.

From

   if (obj->type == MSGPACK_OBJECT_MAP) {
       /* filtering */
   }
   else {
       continue;
   }

To
   if (obj->type != MSGPACK_OBJECT_MAP) {
       continue;
   }
   /* filtering */

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • [N/A] Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [N/A] Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Debug/Valgrind output

$ valgrind --leak-check=full bin/flb-rt-filter_parser 
==12073== Memcheck, a memory error detector
==12073== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==12073== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==12073== Command: bin/flb-rt-filter_parser
==12073== 
Test filter_parser_extract_fields...            [2023/05/14 08:40:36] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:36] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:36] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:36] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:36] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:36] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:36] [ info] [sp] stream processor started
[2023/05/14 08:40:37] [ warn] [engine] service will shutdown in max 5 seconds
[2023/05/14 08:40:38] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_reserve_data_off...          [2023/05/14 08:40:38] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:38] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/05/14 08:40:38] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:38] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:38] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:38] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:38] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:38] [debug] [lib:lib.0] created event channels: read=30 write=31
[2023/05/14 08:40:38] [debug] [lib:lib.0] created event channels: read=34 write=35
[2023/05/14 08:40:38] [ info] [sp] stream processor started
[2023/05/14 08:40:38] [debug] [input chunk] update output instances with new chunk size diff=76
[2023/05/14 08:40:39] [debug] [task] created task=0x5584810 id=0 OK
[2023/05/14 08:40:39] [debug] [test_filter_parser] received message: [1448403340.000000,{"INT":"100","FLOAT":"0.5","BOOL":"true","STRING":"This is an example"}]
[2023/05/14 08:40:39] [debug] [out flush] cb_destroy coro_id=0
[2023/05/14 08:40:39] [debug] [task] destroy task=0x5584810 (task_id=0)
[2023/05/14 08:40:39] [ warn] [engine] service will shutdown in max 1 seconds
[2023/05/14 08:40:40] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_handle_time_key...           [2023/05/14 08:40:40] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:40] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/05/14 08:40:40] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:40] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:40] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:40] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:40] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:40] [debug] [lib:lib.0] created event channels: read=39 write=40
[2023/05/14 08:40:40] [debug] [lib:lib.0] created event channels: read=43 write=44
[2023/05/14 08:40:40] [ info] [sp] stream processor started
[2023/05/14 08:40:40] [debug] [input chunk] update output instances with new chunk size diff=49
[2023/05/14 08:40:41] [debug] [task] created task=0x56610c0 id=0 OK
[2023/05/14 08:40:41] [debug] [test_filter_parser] received message: [1509575121.648000,{"message":"This is an example"}]
[2023/05/14 08:40:41] [debug] [out flush] cb_destroy coro_id=0
[2023/05/14 08:40:41] [debug] [task] destroy task=0x56610c0 (task_id=0)
[2023/05/14 08:40:41] [ warn] [engine] service will shutdown in max 1 seconds
[2023/05/14 08:40:42] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_handle_time_key_with_time_zone... [2023/05/14 08:40:42] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:42] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/05/14 08:40:42] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:42] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:42] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:42] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:42] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:42] [debug] [lib:lib.0] created event channels: read=48 write=49
[2023/05/14 08:40:42] [debug] [lib:lib.0] created event channels: read=52 write=53
[2023/05/14 08:40:42] [ info] [sp] stream processor started
[2023/05/14 08:40:42] [debug] [input chunk] update output instances with new chunk size diff=49
[2023/05/14 08:40:43] [debug] [task] created task=0x573d930 id=0 OK
[2023/05/14 08:40:43] [debug] [test_filter_parser] received message: [1509589521.648000,{"message":"This is an example"}]
[2023/05/14 08:40:43] [debug] [out flush] cb_destroy coro_id=0
[2023/05/14 08:40:43] [debug] [task] destroy task=0x573d930 (task_id=0)
[2023/05/14 08:40:43] [ warn] [engine] service will shutdown in max 1 seconds
[2023/05/14 08:40:44] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_ignore_malformed_time...     [2023/05/14 08:40:44] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:44] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/05/14 08:40:44] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:44] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:44] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:44] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:44] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:44] [debug] [lib:lib.0] created event channels: read=57 write=58
[2023/05/14 08:40:44] [debug] [lib:lib.0] created event channels: read=61 write=62
[2023/05/14 08:40:44] [ info] [sp] stream processor started
[2023/05/14 08:40:44] [error] [parser] cannot parse '2017_$!^-11-01T22:25:21.648'
[2023/05/14 08:40:44] [ warn] [parser:timestamp] invalid time format %Y-%m-%dT%H:%M:%S.%L for '2017_$!^-11-01T22:25:21.648'
[2023/05/14 08:40:44] [debug] [input chunk] update output instances with new chunk size diff=76
[2023/05/14 08:40:45] [debug] [task] created task=0x57da2a0 id=0 OK
[2023/05/14 08:40:45] [debug] [test_filter_parser] received message: [1448403340.000000,{"@timestamp":"2017_$!^-11-01T22:25:21.648","log":"An example"}]
[2023/05/14 08:40:45] [debug] [out flush] cb_destroy coro_id=0
[2023/05/14 08:40:45] [debug] [task] destroy task=0x57da2a0 (task_id=0)
[2023/05/14 08:40:45] [ warn] [engine] service will shutdown in max 1 seconds
[2023/05/14 08:40:46] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_preserve_original_field...   [2023/05/14 08:40:46] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:46] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/05/14 08:40:46] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:46] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:46] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:46] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:46] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:46] [debug] [lib:lib.0] created event channels: read=66 write=67
[2023/05/14 08:40:46] [debug] [lib:lib.0] created event channels: read=70 write=71
[2023/05/14 08:40:46] [ info] [sp] stream processor started
[2023/05/14 08:40:46] [debug] [input chunk] update output instances with new chunk size diff=128
[2023/05/14 08:40:47] [debug] [task] created task=0x6919e20 id=0 OK
[2023/05/14 08:40:47] [debug] [test_filter_parser] received message: [1448403340.000000,{"INT":"100","FLOAT":"0.5","BOOL":"true","STRING":"This is an example","data":"100 0.5 true This is an example","log":"An example"}]
[2023/05/14 08:40:47] [debug] [out flush] cb_destroy coro_id=0
[2023/05/14 08:40:47] [debug] [task] destroy task=0x6919e20 (task_id=0)
[2023/05/14 08:40:47] [ warn] [engine] service will shutdown in max 1 seconds
[2023/05/14 08:40:48] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_first_matched_when_multiple_parser... [2023/05/14 08:40:48] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:48] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:48] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:48] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:48] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:48] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:48] [ info] [sp] stream processor started
[2023/05/14 08:40:49] [ warn] [engine] service will shutdown in max 5 seconds
[2023/05/14 08:40:50] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
Test filter_parser_skip_empty_values_false...   [2023/05/14 08:40:50] [ info] [fluent bit] version=2.1.3, commit=4398840ed5, pid=12073
[2023/05/14 08:40:50] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/14 08:40:50] [ info] [cmetrics] version=0.6.1
[2023/05/14 08:40:50] [ info] [ctraces ] version=0.3.0
[2023/05/14 08:40:50] [ info] [input:lib:lib.0] initializing
[2023/05/14 08:40:50] [ info] [input:lib:lib.0] storage_strategy='memory' (memory only)
[2023/05/14 08:40:50] [ info] [sp] stream processor started
[2023/05/14 08:40:51] [ warn] [engine] service will shutdown in max 5 seconds
[2023/05/14 08:40:52] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
SUCCESS: All unit tests have passed.
==12073== 
==12073== HEAP SUMMARY:
==12073==     in use at exit: 0 bytes in 0 blocks
==12073==   total heap usage: 12,438 allocs, 12,438 frees, 6,334,462 bytes allocated
==12073== 
==12073== All heap blocks were freed -- no leaks are possible
==12073== 
==12073== For lists of detected and suppressed errors, rerun with: -s
==12073== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

From
   if (obj->type == MSGPACK_OBJECT_MAP) {
       /* filtering */
   }
   else {
       continue;
   }

To
   if (obj->type != MSGPACK_OBJECT_MAP) {
       continue;
   }
   /* filtering */

Signed-off-by: Takahiro Yamashita <[email protected]>
@nokute78 nokute78 temporarily deployed to pr May 13, 2023 23:44 — with GitHub Actions Inactive
@nokute78 nokute78 temporarily deployed to pr May 13, 2023 23:44 — with GitHub Actions Inactive
@nokute78 nokute78 temporarily deployed to pr May 13, 2023 23:44 — with GitHub Actions Inactive
@nokute78 nokute78 temporarily deployed to pr May 14, 2023 00:08 — with GitHub Actions Inactive
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Dec 10, 2023
@nokute78
Copy link
Collaborator Author

Updated using current master

@github-actions github-actions bot removed the Stale label Dec 17, 2023
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Mar 18, 2024
@nokute78 nokute78 removed the Stale label Apr 12, 2024
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Jul 13, 2024
@github-actions github-actions bot removed the Stale label Aug 16, 2024
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant