-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix WebSocket concurrents send #1197
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM; an alternative to consider is adding a write lock to the WebSocket that we hold throughout writeframe; it would avoid the additional IOBuffer for each writeframe. It can be tricky to benchmark network-related changes, but I suspect it might be faster too.
Ok, before merging I can try to do the comparison using a lock instead, I think it should work, let's see the benchmarks .... |
I've put a |
Yeah, it makes sense that total throughput of concurrent writes would go down w/ the lock. I guess that's a shift in the paradigm of how I've traditionally used websockets where it's always a single send-and-receive. So all in all, let's go w/ the IOBuffer per writeframe approach. Is this PR ready to merge then? (thanks for doing the ReentrantLock approach; I appreciate it!) |
The buffer could possibly be a task local variable and reused between calls, but let's merge this to get the fix in first. |
I tried the task-local storage but results are not what was expected, it seems that memory allocation gets worst, and I find Below what I tried. The modified const frameio = TaskLocalValue{IOBuffer}(() -> IOBuffer())
function writeframe(io::IO, x::Frame)
n = write(frameio[], hton(uint16(x.flags)))
if x.extendedlen !== nothing
n += write(frameio[], hton(x.extendedlen))
end
if x.mask != EMPTY_MASK
n += write(frameio[], UInt32(x.mask))
end
pl = x.payload
# manually unroll a few known type cases to help the compiler
if pl isa Vector{UInt8}
n += write(frameio[], pl)
elseif pl isa Base.CodeUnits{UInt8,String}
n += write(frameio[], pl)
else
n += write(frameio[], pl)
end
write(io.io, take!(frameio[]))
return n
end
using HTTP.WebSockets
const COUNT = 100
function write_message(ws, msg)
for i in 1:100
send(ws, msg)
end
end
function client_twin(ws)
for count in 0:COUNT
@async write_message(ws, count)
end
end
function serve()
server = WebSockets.listen!("127.0.0.1", 8081) do ws
client_twin(ws)
receive(ws)
end
wait(server)
end
function write()
WebSockets.open("ws://127.0.0.1:8081") do ws
try
s = receive(ws)
t = time()
count = 0
while count < COUNT * 100
s = receive(ws)
count += 1
end
delta = time() - t
println("delta time: $delta")
close(ws)
catch e
@error "ws client: $e"
end
end
end
srvtask = @async serve() julia -i bench.jl
julia> write()
julia> @time write()
# writeframe current version with additional buffer
delta time: 0.13498997688293457
0.139230 seconds (142.53 k allocations: 11.041 MiB)
# writeframe with TaskLocalValue
delta time: 0.14266705513000488
0.146890 seconds (183.03 k allocations: 11.225 MiB) |
and using
for the master branch and
with those changes to your bechmark. |
Nice!, seekstart makes the difference. If adding the TaskLocalValue dep is not a problem I could make a new PR. |
Fixes #1196