-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Websocket API: messages truncated to 4KB #357
Comments
Are you sure this isn't a limitation of your tool? I tried to reproduce this with a # @TEST-GROUP: web-socket
#
# @TEST-PORT: BROKER_WEB_SOCKET_PORT
#
# @TEST-EXEC: btest-bg-run node "broker-node --config-file=../node.cfg"
# @TEST-EXEC: btest-bg-run recv "python3 ../recv.py >recv.out"
# @TEST-EXEC: $SCRIPTS/wait-for-file recv/ready 15 || (btest-bg-wait -k 1 && false)
#
# @TEST-EXEC: btest-bg-run send "python3 ../send.py"
#
# @TEST-EXEC: $SCRIPTS/wait-for-file recv/done 30 || (btest-bg-wait -k 1 && false)
# @TEST-EXEC: btest-diff recv/recv.out
#
# @TEST-EXEC: btest-bg-wait -k 1
@TEST-START-FILE node.cfg
broker {
disable-ssl = true
}
topics = ["/test"]
verbose = true
@TEST-END-FILE
@TEST-START-FILE recv.py
import asyncio, websockets, os, time, json, sys
from datetime import datetime
ws_port = os.environ['BROKER_WEB_SOCKET_PORT'].split('/')[0]
ws_url = f'ws://localhost:{ws_port}/v1/messages/json'
async def do_run():
# Try up to 30 times.
connected = False
for i in range(30):
try:
ws = await websockets.connect(ws_url)
connected = True
# send filter and wait for ack
await ws.send('["/test"]')
ack_json = await ws.recv()
ack = json.loads(ack_json)
if not 'type' in ack or ack['type'] != 'ack':
print('*** unexpected ACK from server:')
print(ack_json)
sys.exit()
# tell btest to start the sender now
with open('ready', 'w') as f:
f.write('ready')
# the message must be valid JSON
msg_json = await ws.recv()
msg = json.loads(msg_json)
# pretty-print to stdout (redirected to recv.out)
print(json.dumps(msg, indent=2))
# tell btest we're done
with open('done', 'w') as f:
f.write('done')
await ws.close()
sys.exit()
except:
if not connected:
print(f'failed to connect to {ws_url}, try again', file=sys.stderr)
time.sleep(1)
else:
sys.exit()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE
@TEST-START-FILE send.py
import asyncio, websockets, os, json, sys
ws_port = os.environ['BROKER_WEB_SOCKET_PORT'].split('/')[0]
ws_url = f'ws://localhost:{ws_port}/v1/messages/json'
msg_data = []
for i in range(200):
msg_data.append({
'@data-type': 'count',
'data': i
})
# Message with timestamps using the local time zone.
msg = {
'type': 'data-message',
'topic': '/test',
'@data-type': "vector",
'data': msg_data
}
async def do_run():
async with websockets.connect(ws_url) as ws:
await ws.send('[]')
await ws.recv() # wait for ACK
msg_str = json.dumps(msg)
if len(msg_str) < 4096:
raise RuntimeError('message is too short')
await ws.send(msg_str)
resp = await ws.recv()
print(resp)
await ws.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE Here's the generated JSON as text file: tmp.txt. |
Small correction: the text file was actually from the print in the receiver after pretty-printing. However, even if I do pretty print at the sender ( |
Hi @Neverlord You're right, there is a problem with I was using On the other hand, my Golang code is sending the whole message (also confirmed with wireshark) and instead of responding with an error, broker is just terminating the connection. I have some sample code here: https://github.com/simeonmiteff/broker-issue-357 (you can build this with This is what it looks like with <4KB messages: $ ./broker-issue-357 100
2023/05/28 17:25:52 connected to remote endpoint with UUID=55791c58-dae3-538e-95db-0019faa2ef24 version=2.5.0-dev
2023/05/28 17:25:52 sent 100 counts
2023/05/28 17:25:52 received 100 counts
2023/05/28 17:25:53 sent 100 counts
2023/05/28 17:25:53 received 100 counts
^C2023/05/28 17:25:53 received signal, shutting down websocket
2023/05/28 17:25:54 shutting down websocket ...and the zeek script: $ zeek listen.zeek
peer added, [id=f2646bce-396a-599d-bd6e-44721e5ef288, network=[address=::1, bound_port=47730/tcp]]
receiver got ping with 100 counts
receiver got ping with 100 counts
peer lost, [id=f2646bce-396a-599d-bd6e-44721e5ef288, network=[address=::1, bound_port=47730/tcp]] And with >4KB messages: $ ./broker-issue-357
2023/05/28 17:24:42 connected to remote endpoint with UUID=2010a6e7-2145-50d3-90f7-ecc8f269f16e version=2.5.0-dev
2023/05/28 17:24:42 sent 200 counts
2023/05/28 17:24:42 websocket: close 1006 (abnormal closure): unexpected EOF I can see broker closing the socket (
Your Python test suggests I have a different bug, but that would mean the message sent via the Golang library is not well-formed and is hitting some code path in broker that causes it to silently drop the connection (still a bug!) but then I would expect it to also not work for <4KB messages, but it works just fine... |
Can you share the JSON that your Go program generates? That would make trying to reproduce this issue easier. |
@Neverlord no problem: go.json.txt This was copied out of wireshark, so I'm fairly certain it is exactly what broker sees. |
Just an idea, since you already have looked at this in Wireshark: is the Go library sending this entire JSON as a single WebSocket frame? Broker expects a full JSON message per frame. So if your library splits up the messages into multiple frames, that could cause Broker to think it deals with a broken client. |
@Neverlord indeed, the Go websocket library ( The websocket RFC requires fragmentation, so (respectfully) I'd say not supporting this in broker is a bug:
Meanwhile I'll see if there is a way to get |
Indeed The gorilla/websocket documentation has a section that describes this trade of quite well (note that frame size limits is not a consideration since fragmentation takes care of that):
|
Ah, I didn't even think about fragmented frames. But it seems like you're right, there's been a bug in how the WebSocket implementation handles opcodes of continuation frames. Can you try the branch |
@Neverlord the
I also think actor-framework/actor-framework@d3cfa1f may have introduced a new bug, I'm now seeing timestamp strings in the {
"type": "data-message",
"topic": "/topic/test",
"@data-type": "vector",
"data": [
{
"@data-type": "count",
"data": 1
},
{
"@data-type": "count",
"data": 1
},
{
"@data-type": "vector",
"data": [
{
"@data-type": "string",
"data": "pong"
},
{
"@data-type": "vector",
"data": [
{
"@data-type": "vector",
"data": [
{
"@data-type": "count",
"data": 100000
}
]
}
]
},
{
"@data-type": "vector",
"data": [
{
"@data-type": "vector",
"data": [
{
"@data-type": "count",
"data": 1
},
{
"@data-type": "timestamp",
"data": "2023-05-30T09:24:34.796706048+10:00"
}
]
}
]
}
]
}
]
}
|
@Neverlord if it helps, I modified your btest python client script to send fragments (payload is now an encoded zeek event), and it does the same thing as the Golang code: import asyncio, websockets, os, json, sys
ws_port = os.environ['BROKER_WEB_SOCKET_PORT'].split('/')[0]
ws_url = f'ws://localhost:{ws_port}/v1/messages/json'
msg_data = []
for i in range(200):
msg_data.append({
'@data-type': 'count',
'data': i
})
# Message with timestamps using the local time zone.
msg = {
'type': 'data-message',
'topic': '/topic/test',
'@data-type': "vector",
'data': [
{"@data-type": "count", "data": 1},
{"@data-type": "count", "data": 1},
{"@data-type": "vector", "data": [
{"@data-type": "string", "data": "ping"},
{"@data-type": "vector", "data":[
{"@data-type": "vector", "data": msg_data},
]
}
]
}
]
}
async def do_run():
async with websockets.connect(ws_url) as ws:
await ws.send('[]')
await ws.recv() # wait for ACK
msg_str = json.dumps(msg)
if len(msg_str) < 4096:
raise RuntimeError('message is too short')
frames = []
while len(msg_str) > 4096:
frames.append(msg_str[:4096])
msg_str = msg_str[4096:]
if len(msg_str) > 0:
frames.append(msg_str)
await ws.send(frames)
resp = await ws.recv()
print(resp)
await ws.close()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(do_run())
except KeyboardInterrupt:
pass |
It's a deliberate change to have unambiguous timestamps with resolutions from seconds to nanoseconds, but let's discuss this someplace else. I've dropped the commit related to the timestamps from the branch, so it shouldn't affect the testing. And sorry for not testing the backport properly. It should work now and I've also added the Python btest as a regression test to the branch. Mind trying again? |
@Neverlord yup - that fixes it 🎉. Thank you! Re: timestamp format, I note #346 and it makes sense, I was just unprepared for it 😁 |
* origin/topic/neverlord/gh-357: Fix handling of fragmented WebSocket frames
Broker's websocket API seems to be truncating incoming JSON messages to 4KB.
Using this zeek script:
And testing by hand with
websocat
and a 4097 byte JSON encoding of an event:Removing one of the zeros from the last
count
brings it down to 4096 bytes:...it works and the zeek script outputs
receiver got ping with 103 counts
.The text was updated successfully, but these errors were encountered: