Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

基于Wasm技术重新实现字符集转码遇到的问题 #9967

Open
opencmit2 opened this issue Feb 20, 2025 · 0 comments
Open

基于Wasm技术重新实现字符集转码遇到的问题 #9967

opencmit2 opened this issue Feb 20, 2025 · 0 comments

Comments

@opencmit2
Copy link
Contributor

现象:
输入数据文件为/tmp/test.log
for i in {1..20}; do echo $i >> /tmp/test.log; done

fluent-bit输出中, 有些数据行被合并, 比如:
...
[14] cpu.local: [[1738889482.276986343, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xe7\x9cg\xa5Y\x0a", "message"=>{"log"=>"15"}}]
[15] cpu.local: [[1738889482.276986752, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xee\xdd\x00\x00\x00\x02", [(ext: 0)"g\xa5Y\x0a\x10\x82}8", {}]=>{"tag"=>"2", "time"=>(ext: -1)"B\x09\xf4\xe0g\xa5Y\x0a", "message"=>{"log"=>"17"}}}]
[16] cpu.local: [[1738889482.276987641, {}], {"tag"=>"2", "time"=>(ext: -1)"B\x09\xfb\xe4g\xa5Y\x0a", "message"=>{"log"=>"18"}}]
...

分析输出数据, time中有些byte是0x00.

fluent-bit代码: plugins/filter_wasm/filter_wasm.c
/* cb_filter callback */
static int cb_wasm_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_bytes,
struct flb_filter_instance *f_ins,
struct flb_input_instance *i_ins,
void *filter_context,
struct flb_config *config)
{
int ret;
char ret_val = NULL;
...
case FLB_FILTER_WASM_FMT_MSGPACK:
...
/
Execute WASM program /
ret_val = flb_wasm_call_function_format_msgpack(wasm, ctx->wasm_function_name,
tag, tag_len,
log_event.timestamp,
buf, buf_size);
...
case FLB_FILTER_WASM_FMT_MSGPACK:
/
msgpack found, pack it msgpack representation */
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
&log_encoder,
ret_val,
strlen(ret_val));
...

msgpack模式下, 使用char*接收返回值; 如果ret_val返回值中间有0x00, 后续调用flb_log_event_encoder_set_body_from_raw_msgpack时, 传入的参数是strlen(ret_val), 则返回值被截断;

wasm filter code:
char* c_filter(char* tag, int len, uint32_t sec, uint32_t nsec, char* record, int record_len)
{
...
mpack_log("data=%s, len=%d\n", record, record_len);
...
// time
mpack_write_cstr(&writer, "time");
sec=1739005140; nsec=743676544; // debug
mpack_write_timestamp(&writer, sec, nsec);
...
// 返回数据
if (size != strlen(msg_utf8)) {
// 逐字符输出16进制数据
printf("utf8 raw data: ");
for(int i = 0; i < size; i++) {
printf("%02x ", (unsigned char)msg_utf8[i]);
}
printf("\n");
printf("utf8 str data: ");
for(int i = 0; i < strlen(msg_utf8); i++) {
printf("%02x ", (unsigned char)msg_utf8[i]);
}
printf("\n");
printf("data=%s, len=%zu, strlen(data)=%lu, sec=%d, nsec=%d\n", msg_utf8, size, strlen(msg_utf8), sec, nsec);
}
return msg_utf8;
...

fluent-bit conf:
...
[INPUT]
name tail
path /tmp/*.log
tag cpu.local

[FILTER]
Name wasm
match *
# msgpack/json
Event_Format msgpack
WASM_Path c_filter.wasm
Function_Name c_filter
accessible_paths .
Wasm_Heap_Size 512000000
...

test:

echo 1 >> /tmp/test.log; done

fluent-bit output:
utf8 raw data: 83 a3 74 61 67 a1 32 a4 74 69 6d 65 d7 ff b1 4e 6a 00 67 a7 1c d4 a7 6d 65 73 73 61 67 65 81 a3 6c 6f 67 a1 31
utf8 str data: 83 a3 74 61 67 a1 32 a4 74 69 6d 65 d7 ff b1 4e 6a
data=��tag�2�time����Nj, len=37, strlen(data)=17, sec=1739005140, nsec=743676544

如果filter代码中注释掉time:
char* c_filter(char* tag, int len, uint32_t sec, uint32_t nsec, char* record, int record_len)
{
...
// time
//mpack_write_cstr(&writer, "time");
//sec=1739005140; nsec=743676544; // debug
//mpack_write_timestamp(&writer, sec, nsec);
...

继续test:

for j in {1..20}; do echo $j >> /tmp/test.log; done

fluent-bit output:
data=��log�1, len=7
data=��tag�2�message��log�log1�, len=25, strlen=28
[0] cpu.local: [[1738894225.067637732, {}], {"tag"=>"2", "message"=>{"log"=>"log1"}}]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant