Skip to content

Commit

Permalink
Improve handling of closing WebSockets
Browse files Browse the repository at this point in the history
Closing WebSockets can be initiated from two sides (server and client).
This process involves a sort of handshake where both parties agree on
closing the connection before actually closing it. When the server is
reading from the socket and the client initiated closing the connection
this would be handled fine.

However, when the server initiated closing the connection, the process
could be complicated by a potentially blocking read call on the socket
in another Task. In this case there are two processes listening for
control frames on the socket and it's not known in advance who will get
to handle the client's reply.

This is solved by adding a CLOSING state to the WebSocket which can be
used to track who initiated closing the connection. Deferring handling
of the client's reply to the appropriate process.

This also prevents duplicate OPCODE_CLOSE messages from being sent by
the server when it initiates a the close and also replies to the
acknowledgement by the client when a `read` is watching the socket.
  • Loading branch information
bauglir committed Jul 27, 2017
1 parent eb16f72 commit b43c7b2
Showing 1 changed file with 55 additions and 23 deletions.
78 changes: 55 additions & 23 deletions src/WebSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export WebSocket,

const TCPSock = Base.TCPSocket

@enum ReadyState CONNECTED=0x1 CLOSING=0x2 CLOSED=0x3

""" Buffer writes to socket till flush (sock)"""
init_socket(sock) = Base.buffer_writes(sock)

Expand All @@ -50,12 +52,11 @@ data in a frame and unwrapping (and concatenating) incoming data.
type WebSocket
id::Int
socket::TCPSock
is_closed::Bool
sent_close::Bool
state::ReadyState

function WebSocket(id::Int,socket::TCPSock)
init_socket(socket)
new(id,socket, !isopen(socket), false)
new(id, socket, CONNECTED)
end
end

Expand Down Expand Up @@ -153,7 +154,7 @@ end

""" Write text data; will be sent as one frame."""
function Base.write(ws::WebSocket,data::String)
if ws.is_closed
if !isopen(ws)
@show ws
error("Attempted write to closed WebSocket\n")
end
Expand All @@ -162,7 +163,7 @@ end

""" Write binary data; will be sent as one frame."""
function Base.write(ws::WebSocket, data::Array{UInt8})
if ws.is_closed
if !isopen(ws)
@show ws
error("attempt to write to closed WebSocket\n")
end
Expand All @@ -188,26 +189,45 @@ send_pong(ws, data...) = write_pong(ws.socket, data...)
Send a close message.
"""
function Base.close(ws::WebSocket)
# Tell client to close connection
if !isopen(ws)
error("Attempt to close closed WebSocket")
end

# Ask client to acknowledge closing the connection
locked_write(ws.socket, true, "", OPCODE_CLOSE)
ws.is_closed = true

# Wait till client responds with an OPCODE_CLOSE
while true
wsf = read_frame(ws.socket)
# ALERT: stuff might get lost in ether here
is_control_frame(wsf) || continue
wsf.opcode == OPCODE_CLOSE || continue
break
ws.state = CLOSING

# Wait till the client responds with an OPCODE_CLOSE. This process is
# complicated by potential blocking reads on the WebSocket in other Tasks
# which may receive the response control frame. Synchronization of who is
# responsible for closing the underlying socket is done using the
# WebSocket's state. When this side initiates closing the connection it is
# responsible for cleaning up, when the other side initiates the close the
# read method is
#
# The exception handling is necessary as read_frame will error when the
# OPCODE_CLOSE control frame is received by a potentially blocking read in
# another Task
try
while ws.state === CLOSING
wsf = read_frame(ws.socket)
# ALERT: stuff might get lost in ether here
if is_control_frame(wsf) && (wsf.opcode == OPCODE_CLOSE)
ws.state = CLOSED
end
end

close(ws.socket)
catch exception
!isa(exception, EOFError) && rethrow(exception)
end
close(ws.socket)
end
"""
isopen(WebSocket)-> Bool
A WebSocket is closed if the underlying TCP socket closes, or if we send or
receive a close message.
"""
Base.isopen(ws::WebSocket) = !ws.is_closed && isopen(ws.socket)
Base.isopen(ws::WebSocket) = (ws.state === CONNECTED) && isopen(ws.socket)


""" Represents one (received) message frame."""
Expand Down Expand Up @@ -252,12 +272,24 @@ is_control_frame(msg::WebSocketFragment) = (msg.opcode & 0b0000_1000) > 0

""" Respond to pings, ignore pongs, respond to close."""
function handle_control_frame(ws::WebSocket,wsf::WebSocketFragment)

if wsf.opcode == OPCODE_CLOSE
# Reply with an empty CLOSE frame
locked_write(ws.socket, true, "", OPCODE_CLOSE)
ws.is_closed = true
close(ws.socket)
# A close OPCODE can be received for two reasons. Either the other side
# is initiating a disconnection, or the this side is (through a call to
# close on the WebSocket) and the client has replied that it is okay
# with closing the connection. This can be derived from the current
# state of the WebSocket
if ws.state !== CLOSING
# The other side initiated the disconnect, so the action must be
# acknowledged by replying with an empty CLOSE frame and cleaning
# up
locked_write(ws.socket, true, "", OPCODE_CLOSE)
close(ws.socket)
end

# In the other case the close method is expected to clean-up, which can
# be triggered by changing the state of the WebSocket
ws.state = CLOSED

throw(WebSocketClosedError())
elseif wsf.opcode == OPCODE_PING
write_pong(ws.socket,wsf.data)
Expand Down Expand Up @@ -317,7 +349,7 @@ data (contents/body/payload) of the message will be returned from this
function.
"""
function Base.read(ws::WebSocket)
if ws.is_closed
if !isopen(ws)
error("Attempt to read from closed WebSocket")
end
frame = read_frame(ws.socket)
Expand Down

0 comments on commit b43c7b2

Please sign in to comment.