We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
现象: 输入数据文件为/tmp/test.log for i in {1..20}; do echo $i >> /tmp/test.log; done
fluent-bit输出中, 有些数据行被合并, 比如: ... [14] cpu.local: [[1738889482.276986343, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xe7\x9cg\xa5Y\x0a", "message"=>{"log"=>"15"}}] [15] cpu.local: [[1738889482.276986752, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xee\xdd\x00\x00\x00\x02", [(ext: 0)"g\xa5Y\x0a\x10\x82}8", {}]=>{"tag"=>"2", "time"=>(ext: -1)"B\x09\xf4\xe0g\xa5Y\x0a", "message"=>{"log"=>"17"}}}] [16] cpu.local: [[1738889482.276987641, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xfb\xe4g\xa5Y\x0a", "message"=>{"log"=>"18"}}] ...
分析输出数据, time中有些byte是0x00.
fluent-bit代码: plugins/filter_wasm/filter_wasm.c /* cb_filter callback */ static int cb_wasm_filter(const void *data, size_t bytes, const char *tag, int tag_len, void **out_buf, size_t *out_bytes, struct flb_filter_instance *f_ins, struct flb_input_instance *i_ins, void *filter_context, struct flb_config *config) { int ret; char ret_val = NULL; ... case FLB_FILTER_WASM_FMT_MSGPACK: ... / Execute WASM program / ret_val = flb_wasm_call_function_format_msgpack(wasm, ctx->wasm_function_name, tag, tag_len, log_event.timestamp, buf, buf_size); ... case FLB_FILTER_WASM_FMT_MSGPACK: / msgpack found, pack it msgpack representation */ ret = flb_log_event_encoder_set_body_from_raw_msgpack( &log_encoder, ret_val, strlen(ret_val)); ...
msgpack模式下, 使用char*接收返回值; 如果ret_val返回值中间有0x00, 后续调用flb_log_event_encoder_set_body_from_raw_msgpack时, 传入的参数是strlen(ret_val), 则返回值被截断;
wasm filter code: char* c_filter(char* tag, int len, uint32_t sec, uint32_t nsec, char* record, int record_len) { ... mpack_log("data=%s, len=%d\n", record, record_len); ... // time mpack_write_cstr(&writer, "time"); sec=1739005140; nsec=743676544; // debug mpack_write_timestamp(&writer, sec, nsec); ... // 返回数据 if (size != strlen(msg_utf8)) { // 逐字符输出16进制数据 printf("utf8 raw data: "); for(int i = 0; i < size; i++) { printf("%02x ", (unsigned char)msg_utf8[i]); } printf("\n"); printf("utf8 str data: "); for(int i = 0; i < strlen(msg_utf8); i++) { printf("%02x ", (unsigned char)msg_utf8[i]); } printf("\n"); printf("data=%s, len=%zu, strlen(data)=%lu, sec=%d, nsec=%d\n", msg_utf8, size, strlen(msg_utf8), sec, nsec); } return msg_utf8; ...
fluent-bit conf: ... [INPUT] name tail path /tmp/*.log tag cpu.local
[FILTER] Name wasm match * # msgpack/json Event_Format msgpack WASM_Path c_filter.wasm Function_Name c_filter accessible_paths . Wasm_Heap_Size 512000000 ...
test:
fluent-bit output: utf8 raw data: 83 a3 74 61 67 a1 32 a4 74 69 6d 65 d7 ff b1 4e 6a 00 67 a7 1c d4 a7 6d 65 73 73 61 67 65 81 a3 6c 6f 67 a1 31 utf8 str data: 83 a3 74 61 67 a1 32 a4 74 69 6d 65 d7 ff b1 4e 6a data=��tag�2�time����Nj, len=37, strlen(data)=17, sec=1739005140, nsec=743676544
如果filter代码中注释掉time: char* c_filter(char* tag, int len, uint32_t sec, uint32_t nsec, char* record, int record_len) { ... // time //mpack_write_cstr(&writer, "time"); //sec=1739005140; nsec=743676544; // debug //mpack_write_timestamp(&writer, sec, nsec); ...
继续test:
fluent-bit output: data=��log�1, len=7 data=��tag�2�message��log�log1�, len=25, strlen=28 [0] cpu.local: [[1738894225.067637732, {}], {"tag"=>"2", "message"=>{"log"=>"log1"}}]
The text was updated successfully, but these errors were encountered:
No branches or pull requests
现象:
输入数据文件为/tmp/test.log
for i in {1..20}; do echo $i >> /tmp/test.log; done
fluent-bit输出中, 有些数据行被合并, 比如:
...
[14] cpu.local: [[1738889482.276986343, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xe7\x9cg\xa5Y\x0a", "message"=>{"log"=>"15"}}]
[15] cpu.local: [[1738889482.276986752, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xee\xdd\x00\x00\x00\x02", [(ext: 0)"g\xa5Y\x0a\x10\x82}8", {}]=>{"tag"=>"2", "time"=>(ext: -1)"B\x09\xf4\xe0g\xa5Y\x0a", "message"=>{"log"=>"17"}}}]
[16] cpu.local: [[1738889482.276987641, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xfb\xe4g\xa5Y\x0a", "message"=>{"log"=>"18"}}]
...
分析输出数据, time中有些byte是0x00.
fluent-bit代码: plugins/filter_wasm/filter_wasm.c
/* cb_filter callback */
static int cb_wasm_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_bytes,
struct flb_filter_instance *f_ins,
struct flb_input_instance *i_ins,
void *filter_context,
struct flb_config *config)
{
int ret;
char ret_val = NULL;
...
case FLB_FILTER_WASM_FMT_MSGPACK:
...
/ Execute WASM program /
ret_val = flb_wasm_call_function_format_msgpack(wasm, ctx->wasm_function_name,
tag, tag_len,
log_event.timestamp,
buf, buf_size);
...
case FLB_FILTER_WASM_FMT_MSGPACK:
/ msgpack found, pack it msgpack representation */
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
&log_encoder,
ret_val,
strlen(ret_val));
...
msgpack模式下, 使用char*接收返回值; 如果ret_val返回值中间有0x00, 后续调用flb_log_event_encoder_set_body_from_raw_msgpack时, 传入的参数是strlen(ret_val), 则返回值被截断;
wasm filter code:
char* c_filter(char* tag, int len, uint32_t sec, uint32_t nsec, char* record, int record_len)
{
...
mpack_log("data=%s, len=%d\n", record, record_len);
...
// time
mpack_write_cstr(&writer, "time");
sec=1739005140; nsec=743676544; // debug
mpack_write_timestamp(&writer, sec, nsec);
...
// 返回数据
if (size != strlen(msg_utf8)) {
// 逐字符输出16进制数据
printf("utf8 raw data: ");
for(int i = 0; i < size; i++) {
printf("%02x ", (unsigned char)msg_utf8[i]);
}
printf("\n");
printf("utf8 str data: ");
for(int i = 0; i < strlen(msg_utf8); i++) {
printf("%02x ", (unsigned char)msg_utf8[i]);
}
printf("\n");
printf("data=%s, len=%zu, strlen(data)=%lu, sec=%d, nsec=%d\n", msg_utf8, size, strlen(msg_utf8), sec, nsec);
}
return msg_utf8;
...
fluent-bit conf:
...
[INPUT]
name tail
path /tmp/*.log
tag cpu.local
[FILTER]
Name wasm
match *
# msgpack/json
Event_Format msgpack
WASM_Path c_filter.wasm
Function_Name c_filter
accessible_paths .
Wasm_Heap_Size 512000000
...
test:
echo 1 >> /tmp/test.log; done
fluent-bit output:
utf8 raw data: 83 a3 74 61 67 a1 32 a4 74 69 6d 65 d7 ff b1 4e 6a 00 67 a7 1c d4 a7 6d 65 73 73 61 67 65 81 a3 6c 6f 67 a1 31
utf8 str data: 83 a3 74 61 67 a1 32 a4 74 69 6d 65 d7 ff b1 4e 6a
data=��tag�2�time����Nj, len=37, strlen(data)=17, sec=1739005140, nsec=743676544
如果filter代码中注释掉time:
char* c_filter(char* tag, int len, uint32_t sec, uint32_t nsec, char* record, int record_len)
{
...
// time
//mpack_write_cstr(&writer, "time");
//sec=1739005140; nsec=743676544; // debug
//mpack_write_timestamp(&writer, sec, nsec);
...
继续test:
for j in {1..20}; do echo $j >> /tmp/test.log; done
fluent-bit output:
data=��log�1, len=7
data=��tag�2�message��log�log1�, len=25, strlen=28
[0] cpu.local: [[1738894225.067637732, {}], {"tag"=>"2", "message"=>{"log"=>"log1"}}]
The text was updated successfully, but these errors were encountered: