diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 0a0f7a8..e9a2adc 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -36,6 +36,8 @@ export WebSocket, const TCPSock = Base.TCPSocket +@enum ReadyState CONNECTED=0x1 CLOSING=0x2 CLOSED=0x3 + """ Buffer writes to socket till flush (sock)""" init_socket(sock) = Base.buffer_writes(sock) @@ -50,12 +52,11 @@ data in a frame and unwrapping (and concatenating) incoming data. type WebSocket id::Int socket::TCPSock - is_closed::Bool - sent_close::Bool + state::ReadyState function WebSocket(id::Int,socket::TCPSock) init_socket(socket) - new(id,socket, !isopen(socket), false) + new(id, socket, CONNECTED) end end @@ -184,7 +185,7 @@ end """ Write text data; will be sent as one frame.""" function Base.write(ws::WebSocket,data::String) - if ws.is_closed + if !isopen(ws) @show ws error("Attempted write to closed WebSocket\n") end @@ -193,7 +194,7 @@ end """ Write binary data; will be sent as one frame.""" function Base.write(ws::WebSocket, data::Array{UInt8}) - if ws.is_closed + if !isopen(ws) @show ws error("attempt to write to closed WebSocket\n") end @@ -219,26 +220,45 @@ send_pong(ws, data...) = write_pong(ws.socket, data...) Send a close message. """ function Base.close(ws::WebSocket) - # Tell client to close connection + if !isopen(ws) + error("Attempt to close closed WebSocket") + end + + # Ask client to acknowledge closing the connection locked_write(ws.socket, true, "", OPCODE_CLOSE) - ws.is_closed = true - - # Wait till client responds with an OPCODE_CLOSE - while true - wsf = read_frame(ws.socket) - # ALERT: stuff might get lost in ether here - is_control_frame(wsf) || continue - wsf.opcode == OPCODE_CLOSE || continue - break + ws.state = CLOSING + + # Wait till the client responds with an OPCODE_CLOSE. This process is + # complicated by potential blocking reads on the WebSocket in other Tasks + # which may receive the response control frame. Synchronization of who is + # responsible for closing the underlying socket is done using the + # WebSocket's state. When this side initiates closing the connection it is + # responsible for cleaning up, when the other side initiates the close the + # read method is + # + # The exception handling is necessary as read_frame will error when the + # OPCODE_CLOSE control frame is received by a potentially blocking read in + # another Task + try + while ws.state === CLOSING + wsf = read_frame(ws.socket) + # ALERT: stuff might get lost in ether here + if is_control_frame(wsf) && (wsf.opcode == OPCODE_CLOSE) + ws.state = CLOSED + end + end + + close(ws.socket) + catch exception + !isa(exception, EOFError) && rethrow(exception) end - close(ws.socket) end """ isopen(WebSocket)-> Bool A WebSocket is closed if the underlying TCP socket closes, or if we send or receive a close message. """ -Base.isopen(ws::WebSocket) = !ws.is_closed && isopen(ws.socket) +Base.isopen(ws::WebSocket) = (ws.state === CONNECTED) && isopen(ws.socket) """ Represents one (received) message frame.""" @@ -283,12 +303,34 @@ is_control_frame(msg::WebSocketFragment) = (msg.opcode & 0b0000_1000) > 0 """ Respond to pings, ignore pongs, respond to close.""" function handle_control_frame(ws::WebSocket,wsf::WebSocketFragment) - if wsf.opcode == OPCODE_CLOSE - # Reply with an empty CLOSE frame - locked_write(ws.socket, true, "", OPCODE_CLOSE) - ws.is_closed = true - close(ws.socket) + # A close OPCODE can be received for two reasons. Either the other side + # is initiating a disconnection, or the this side is (through a call to + # close on the WebSocket) and the client has replied that it is okay + # with closing the connection. This can be derived from the current + # state of the WebSocket + if ws.state !== CLOSING + # The other side initiated the disconnect, so the action must be + # acknowledged by replying with an empty CLOSE frame and cleaning + # up + try + locked_write(ws.socket, true, "", OPCODE_CLOSE) + catch exception + # On sudden disconnects, the other side may be gone before the + # close acknowledgement can be sent. This will cause an + # ArgumentError to be thrown due to the underlying stream being + # closed. These are swallowed here and will be replaced by a + # WebSocketClosedError below + !isa(exception, ArgumentError) && rethrow(exception) + end + + close(ws.socket) + end + + # In the other case the close method is expected to clean-up, which can + # be triggered by changing the state of the WebSocket + ws.state = CLOSED + throw(WebSocketClosedError()) elseif wsf.opcode == OPCODE_PING write_pong(ws.socket,wsf.data) @@ -348,7 +390,7 @@ data (contents/body/payload) of the message will be returned from this function. """ function Base.read(ws::WebSocket) - if ws.is_closed + if !isopen(ws) error("Attempt to read from closed WebSocket") end frame = read_frame(ws.socket)