diff --git a/README.md b/README.md index a74ddd4..4bc137a 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ *Release version*: -[![WebSockets](http://pkg.julialang.org/badges/WebSockets_0.6.svg)](http://pkg.julialang.org/?pkg=WebSockets&ver=0.6) [![Build Status](https://travis-ci.org/JuliaWeb/WebSockets.jl.svg)](https://travis-ci.org/JuliaWeb/WebSockets.jl) +[![Coverage Status](https://img.shields.io/coveralls/JuliaWeb/WebSockets.jl.svg)] (https://coveralls.io/r/JuliaWeb/WebSockets.jl)a --> Test coverage 96% @@ -12,8 +12,8 @@ Test coverage 96% [![WebSockets](http://pkg.julialang.org/badges/WebSockets_0.6.svg?branch?master)](http://pkg.julialang.org/?pkg=WebSockets&ver=0.6) [![Build Status](https://travis-ci.org/JuliaWeb/WebSockets.jl.svg?branch=master)](https://travis-ci.org/JuliaWeb/WebSockets.jl) - + Test coverage 96% @@ -21,28 +21,46 @@ Test coverage 96% Server and client side [Websockets](https://tools.ietf.org/html/rfc6455) protocol in Julia. WebSockets is a small overhead message protocol layered over [TCP](https://tools.ietf.org/html/rfc793). It uses HTTP(S) for establishing the connections. ## Getting started -Copy this into Julia: +In the package manager, add WebSockets. Then [paste](https://docs.julialang.org/en/v1/stdlib/REPL/index.html#The-Julian-mode-1) this into a REPL: ```julia -(v1.0) pkg> add WebSockets -julia> using WebSockets -julia> # define what to do with http requests, and with websocket upgrades. -julia> serverWS = ServerWS((r) -> WebSockets.Response(200, "OK"), - (ws_server) -> (writeguarded(ws_server, "Hello"); - readguarded(ws_server))); -julia> # serve on socket 8000, but in a coroutine so we can do other things too. -julia> @async WebSockets.serve(serverWS, 8000) -julia> # We ask for a http response, now as our alter ego the client. +julia> using WebSockets, Logging + +julia> serverWS = ServerWS(handler = (req) -> WebSockets.Response(200), wshandler = (ws_server) -> (writeguarded(ws_server, "Hello"); readguarded(ws_server))) +ServerWS(handler=#3(req), wshandler=#4(ws_server), logger=Base.DevNull()) + +julia> import Logging.shouldlog + +julia> shouldlog(::ConsoleLogger, level, _module, group, id) = _module != WebSockets.HTTP.Servers +shouldlog (generic function with 4 methods) + +julia> ta = @async WebSockets.serve(serverWS, port = 8000) +Task (runnable) @0x000000000fc91cd0 + julia> WebSockets.HTTP.get("http://127.0.0.1:8000") -julia> # Talk to ourselves! Print the first response in blue, then hang up. +HTTP.Messages.Response: +""" +HTTP/1.1 200 OK +Transfer-Encoding: chunked + +""" + julia> WebSockets.open("ws://127.0.0.1:8000") do ws_client - data, success = readguarded(ws_client) - if success - printstyled(color=:blue, String(data)) - end - end -julia> # Tell ourselves, the server in a different coroutine: we can stop listening now. -julia> put!(serverWS.in, "x") + data, success = readguarded(ws_client) + if success + println(stderr, ws_client, " received:", String(data)) + end + end; +WebSocket(client, CONNECTED) received:Hello + +WARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued + +julia> put!(serverWS.in, "close!") +"close!" + +julia> ta +Task (done) @0x000000000fc91cd0 + ``` More things to do: Access inline documentation and have a look at the examples folder. The testing files demonstrate a variety of uses. Benchmarks show examples of websockets and servers running on separate processes, as oposed to asyncronous tasks. diff --git a/logutils/logutils_ws.jl b/logutils/logutils_ws.jl index 150c0c7..bd07d41 100644 --- a/logutils/logutils_ws.jl +++ b/logutils/logutils_ws.jl @@ -339,15 +339,15 @@ end function _limlen(data::Union{Vector{UInt8}, Vector{Float64}}) le = length(data) maxlen = 12 # elements, not characters - if le < maxlen - return string(data) + if le < maxlen + return string(data) else adds = " ..... " addlen = 2 truncat = 2 * div(maxlen, 3) tail = maxlen - truncat - addlen - 1 return string(data[1:truncat])[1:end-1] * adds * string(data[end-tail:end])[7:end] - end + end end @@ -358,12 +358,12 @@ _tg() = Dates.Time(now()) "For use in show(io::IO, obj) methods. Hook into this logger's dispatch mechanism." function directto_abstractdevice(io::IO, obj) - if isa(io, ColorDevices) - buf = ColorDevice(IOBuffer()) - else - buf = BlackWDevice(IOBuffer()) - end - _show(buf, obj) + if isa(io, ColorDevices) + buf = ColorDevice(IOBuffer()) + else + buf = BlackWDevice(IOBuffer()) + end + _show(buf, obj) write(io, take!(buf.s)) nothing end diff --git a/src/HTTP.jl b/src/HTTP.jl index da9c205..fe2846a 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -292,10 +292,12 @@ function ServerOptions(; ServerOptions(sslconfig, readtimeout, ratelimit, support100continue, chunksize, logbody) end """ - WebSockets.ServerWS(::WebSockets.HandlerFunction, ::WebSockets.WebsocketHandler) + WebSockets.ServerWS(handler::Function, wshandler::Function, logger::IO) WebSockets.ServerWS is an argument type for WebSockets.serve. Instances include .in and .out channels, see WebSockets.serve. + +Server options can be set using keyword arguments, see methods(WebSockets.ServerWS) """ mutable struct ServerWS{T <: Scheme, H <: Handler, W <: WebsocketHandler} handler::H @@ -313,10 +315,21 @@ end # Define ServerWS without wrapping the functions first. Rely on argument sequence. function ServerWS(h::Function, w::Function, l::IO=stdout; cert::String="", key::String="", args...) + ServerWS(HandlerFunction(h), WebsocketHandler(w), l; cert=cert, key=key, ratelimit = 10//1, args...) end +# Define ServerWS with keyword arguments only +function ServerWS(;handler::Function, wshandler::Function, + logger::IO=stdout, + cert::String="", key::String="", args...) + + ServerWS(HandlerFunction(handler), + WebsocketHandler(wshandler), logger; + cert=cert, key=key, ratelimit = 10//1, args...) +end + # Define ServerWS with function wrappers function ServerWS(handler::H, wshandler::W, @@ -325,6 +338,7 @@ function ServerWS(handler::H, key::String = "", ratelimit = 10//1, args...) where {H <: HandlerFunction, W <: WebsocketHandler} + sslconfig = nothing; scheme = http # http is an imported DataType if cert != "" && key != "" @@ -336,7 +350,6 @@ function ServerWS(handler::H, logger, Channel(1), Channel(2), ServerOptions(;ratelimit = ratelimit, args...)) - return serverws end """ WebSockets.serve(server::ServerWS, port) @@ -410,8 +423,9 @@ function serve(server::ServerWS{T, H, W}, host, port, verbose) where {T, H, W} # through the server.in channel, see above. return end -serve(server::WebSockets.ServerWS, host, port) = serve(server, host, port, false) -serve(server::WebSockets.ServerWS, port) = serve(server, "127.0.0.1", port, false) +serve(server::ServerWS; host= "127.0.0.1", port= "") = serve(server, host, port, false) +serve(server::ServerWS, host, port) = serve(server, host, port, false) +serve(server::ServerWS, port) = serve(server, "127.0.0.1", port, false) """ 'checkratelimit!' updates a dictionary of IP addresses which keeps track of their diff --git a/src/show_ws.jl b/src/show_ws.jl index 9024928..a1c7776 100644 --- a/src/show_ws.jl +++ b/src/show_ws.jl @@ -1,14 +1,14 @@ import Base.show -# Long form, as in display(ws) or REPL ws enter +import Base.method_argnames +# Long form, as in display(ws) or REPL: ws enter function Base.show(io::IO, ::MIME"text/plain", ws::WebSocket{T}) where T ioc = IOContext(io, :wslog => true) print(ioc, "WebSocket{", nameof(T), "}(", ws.server ? "server, " : "client, ") _show(ioc, ws.state) print(ioc, "): ") _show(ioc, ws.socket) - nothing end -# Short form, as in print(stdout, ws) +# Short form with state, as in print(stdout, ws) function Base.show(io::IO, ws::WebSocket{T}) where T ioc = IOContext(io, :compact=>true, :wslog => true) if T == TCPSocket @@ -19,21 +19,16 @@ function Base.show(io::IO, ws::WebSocket{T}) where T print(ioc, ws.server ? "server, " : "client, ") _show(ioc, ws.state) print(ioc, ")") - nothing end -# Short form, as in Juno / Atom -# In documentation (and possible future version)::MIME"application/prs.juno.inline" +# The default Juno / Atom display works nicely with standard output Base.show(io::IO, ::MIME"application/prs.juno.inline", ws::WebSocket) = Base.show(io, ws) -Base.show(io::IO, ::MIME"application/juno+inline", ws::WebSocket) = Base.show(io, ws) - # A Base.show method is already defined by @enum function _show(io::IO, state::ReadyState) kwargs, msg = _uv_status_tuple(state) printstyled(io, msg; kwargs...) - nothing end function _show(io::IO, stream::Base.LibuvStream) @@ -46,9 +41,30 @@ function _show(io::IO, stream::Base.LibuvStream) if !(stream isa Servers.UDPSocket) nba = bytesavailable(stream.buffer) nba > 0 && print(io, ", ", nba, " bytes") + nothing end end - nothing +end +function _show(io::IO, stream::IOStream) + # To avoid accidental type piracy, a double check: + if !get(IOContext(io), :wslog, false) + show(io, stream) + else + kwargs, msg = _uv_status_tuple(stream) + printstyled(io, msg; kwargs...) + end +end +function _show(io::IO, buf::Base.GenericIOBuffer) + # To avoid accidental type piracy, a double check: + if !get(IOContext(io), :wslog, false) + show(io, buf) + else + kwargs, msg = _uv_status_tuple(buf) + printstyled(io, msg; kwargs...) + nba = buf.size + nba > 0 && print(io, ", ", nba, " bytes") + nothing + end end "For colorful printing" @@ -85,8 +101,8 @@ function _uv_status_tuple(x) (color = :red,), "invalid status" end end -function _uv_status_tuple(bs::Base.BufferStream) - if bs.is_open +function _uv_status_tuple(bs::Union{Base.BufferStream, IOStream, Base.GenericIOBuffer}) + if isopen(bs) (color = :green,), "✓" # "open" else (color = :red,), "✘" #"closed" @@ -101,3 +117,71 @@ function _uv_status_tuple(status::ReadyState) (color = :red,), "CLOSED" end end + +### ServerWS +function Base.show(io::IO, sws::ServerWS) + print(io, ServerWS, "(handler=") + _show(io, sws.handler.func) + print(io, ", wshandler=") + _show(io, sws.wshandler.func) + if sws.logger != stdout + print(io, ", logger=") + if sws.logger isa Base.GenericIOBuffer + print(io, "IOBuffer():") + elseif sws.logger isa IOStream + print(io, sws.logger.name, ":") + elseif sws.logger == devnull + else + print(io, nameof(typeof(sws.logger)), ":") + end + _show(IOContext(io, :wslog=>true), sws.logger) + end + if sws.options != WebSockets.ServerOptions() + print(io, ", ") + show(IOContext(io, :wslog=>true), sws.options) + end + print(io, ")") + if isready(sws.in) + printstyled(io, ".in:", color= :yellow) + print(io, sws.in, " ") + end + if isready(sws.out) + printstyled(io, ".out:", color= :yellow) + print(io, sws.out, " ") + end +end + + +function Base.show(io::IO, swo::WebSockets.ServerOptions) + hidetype = get(IOContext(io), :wslog, false) + fina = fieldnames(ServerOptions) + hidetype || print(io, ServerOptions, "(") + for field in fina + fiva = getfield(swo, field) + if fiva != nothing + print(io, field, "=") + print(io, fiva) + if field != last(fina) + print(io, ", ") + end + end + end + hidetype || print(io, ")") + nothing +end + + + + +_show(io::IO, x) = show(io, x) +function _show(io::IO, f::Function) + m = methods(f) + if length(m) > 1 + print(io, f, " has ", length(m), " methods: ") + Base.show_method_table(io, m, 4, false) + else + method = first(m) + argnames = join(method_argnames(method)[2:end], ", ") + print(io, method.name, "(", argnames, ")") + end +end diff --git a/test/show_test.jl b/test/show_test.jl index 1a9176e..6f29c3b 100644 --- a/test/show_test.jl +++ b/test/show_test.jl @@ -1,10 +1,13 @@ using Test import Base: C_NULL, LibuvStream, GenericIOBuffer, BufferStream import Sockets: UDPSocket, @ip_str, send, recvfrom, TCPSocket + using WebSockets + import WebSockets:_show, ReadyState, - _uv_status_tuple + _uv_status_tuple, + Response mutable struct DummyStream <: LibuvStream buffer::GenericIOBuffer status::Int @@ -35,6 +38,28 @@ let kws = [], msgs =[] @test msgs == ["CONNECTED", "CLOSING", "CLOSED"] end +let kws = [], msgs =[] + fi = open("temptemp", "w+") + kwarg, msg = _uv_status_tuple(fi) + push!(kws, kwarg) + push!(msgs, msg) + close(fi) + rm("temptemp") + kwarg, msg = _uv_status_tuple(fi) + push!(kws, kwarg) + push!(msgs, msg) + @test kws == [(color = :green,), (color = :red,)] + @test msgs == ["✓", "✘"] +end + + +fi = open("temptemp", "w+") +io = IOBuffer() +_show(IOContext(io, :wslog=>true), fi) +close(fi) +rm("temptemp") +output = String(take!(io)) +@test output == "✓" ds = DummyStream(IOBuffer(), 0, 0x00000001) io = IOBuffer() @@ -104,6 +129,25 @@ output = String(take!(io.io)) # No reporting of zero bytes @test output == "\e[34muninit\e[39m" +iob = IOBuffer() +write(iob, "123") +io = IOContext(IOBuffer(), :color => true, :wslog=>true) +_show(io, iob) +output = String(take!(io.io)) +@test output == "\e[32m✓\e[39m, 3 bytes" + +iob = IOBuffer() +io = IOContext(IOBuffer()) +_show(io, iob) +output = String(take!(io.io)) +@test output == "IOBuffer(data=UInt8[...], readable=true, writable=true, seekable=true, append=false, size=0, maxsize=Inf, ptr=1, mark=-1)" + +io = IOContext(IOBuffer()) +_show(io, devnull) +output = String(take!(io.io)) +@test output == "Base.DevNull()" || output == "Base.DevNullStream()" + + # Short form, as in print(stdout, ws) ws = WebSocket(BufferStream(), true) @@ -125,7 +169,6 @@ show(io, ws) output = String(take!(io.io)) @test output == "WebSocket{BufferStream}(client, \e[32mCONNECTED\e[39m)" - ws = WebSocket(TCPSocket(), false) io = IOContext(IOBuffer(), :color => true) show(io, ws) @@ -140,14 +183,6 @@ show(io, "application/prs.juno.inline", ws) output = String(take!(io.io)) @test output == "WebSocket(client, \e[32mCONNECTED\e[39m)" -#short form for Atom / Juno -ws = WebSocket(TCPSocket(), false) -io = IOContext(IOBuffer(), :color => true) -show(io, "application/juno+inline", ws) -output = String(take!(io.io)) -@test output == "WebSocket(client, \e[32mCONNECTED\e[39m)" - - # Long form, as in print(stdout, ws) ws = WebSocket(TCPSocket(), true) io = IOContext(IOBuffer(), :color => true) @@ -162,3 +197,60 @@ io = IOContext(IOBuffer(), :color => true) show(io, "text/plain", ws) output = String(take!(io.io)) @test output == "WebSocket{BufferStream}(client, \e[32mCONNECTED\e[39m): \e[32m✓\e[39m, 2 bytes" + +### For testing Base.show(ServerWS) +h(r) = Response(200) +w(ws, r) = nothing +io = IOBuffer() +_show(io, h) +output = String(take!(io)) +@test output == "h(r)" + +_show(io, x-> 2x) +output = String(take!(io)) +@test output[1] == '#' + + +sws = ServerWS(h, w) +show(io, sws) +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r))" + +sws = ServerWS(h, w, ratelimit = 1 // 1) +show(io, sws) +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r), readtimeout=180.0, ratelimit=1//1, support100continue=true, logbody=true)" + +# with loggers +sws = ServerWS(handler= h, wshandler= w, logger = stderr) +io = IOBuffer() +show(io, sws) +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=TTY:✓)" || + output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=PipeEndpoint():✓)" || + output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=PipeEndpoint:✓)" +fi = open("temptemp", "w+") +sws = ServerWS(h, w, fi) +io = IOBuffer() +_show(io, sws) +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=:✓)" +close(fi) +_show(io, sws) +rm("temptemp") +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=:✘)" + + +sws = ServerWS(handler= h, wshandler= w, logger = devnull) +io = IOBuffer() +show(io, sws) +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=Base.DevNull())" || + output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=Base.DevNullStream())" + +sws = ServerWS(handler= h, wshandler= w, logger = IOBuffer()) +io = IOBuffer() +show(io, sws) +output = String(take!(io)) +@test output == "ServerWS(handler=h(r), wshandler=w(ws, r), logger=IOBuffer():✓)" diff --git a/test/throttling_test.jl b/test/throttling_test.jl index cc7089e..e4c83c5 100644 --- a/test/throttling_test.jl +++ b/test/throttling_test.jl @@ -52,7 +52,9 @@ countconnections(1//1, Millisecond(500)) close(tcpserver) @info "Make http request to a server with specified ratelimit 1 new connections // 1 second" -const THISPORT = 8091 +if !@isdefined(THISPORT) + const THISPORT = 8091 +end const IPA = "127.0.0.1" const URL9 = "http://$IPA:$THISPORT" serverWS = ServerWS( (r) -> WebSockets.Response(200, "OK"), @@ -80,7 +82,8 @@ put!(serverWS.in, "closeit") serverWS = ServerWS( (r) -> WebSockets.Response(200, "OK"), (r, ws) -> nothing, sslconfig = SSLConfig()) -tas = @async WebSockets.serve(serverWS, IPA, THISPORT) + +tas = @async WebSockets.serve(serverWS, host = IPA, port =THISPORT) const URL10 = "https://$IPA:$THISPORT" @test_throws WebSockets.HTTP.IOExtras.IOError WebSockets.HTTP.request("GET", URL10) const WSSURI = "wss://$IPA:$THISPORT"