Skip to content

Commit

Permalink
Merge branch 'bugs/314-libuv'
Browse files Browse the repository at this point in the history
  • Loading branch information
ademar committed Oct 31, 2015
2 parents 5b1667f + 5108638 commit 82acd5c
Show file tree
Hide file tree
Showing 23 changed files with 89 additions and 48 deletions.
Empty file modified examples/Asp.NET/App.config
100755 → 100644
Empty file.
Empty file modified examples/Example/Program.fs
100755 → 100644
Empty file.
Empty file modified examples/Fibonacci/Program.fs
100755 → 100644
Empty file.
Empty file modified examples/WebSocket/App.config
100755 → 100644
Empty file.
77 changes: 52 additions & 25 deletions src/Suave.LibUv/Tcp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ type SingleThreadSynchronizationContext(loop, runOnThisThreadHandler) =

let queue = new ConcurrentQueue<KeyValuePair<SendOrPostCallback, obj>>()

[<DefaultValue>] val mutable running : bool
[<DefaultValue>] val mutable uv_handle_cb : uv_handle_cb

member this.Post( d : SendOrPostCallback, state : obj) =
member this.Post(d : SendOrPostCallback, state : obj) =
if (d = null) then
raise( ArgumentNullException("d"))
else queue.Enqueue(new KeyValuePair<SendOrPostCallback, obj>(d, state))

member this.Send() =
uv_async_send(runOnThisThreadHandler) |> checkStatus
if this.running then
uv_async_send(runOnThisThreadHandler) |> checkStatus

member this.runOnCurrentThread( _ :IntPtr) =
let mutable workItem = KeyValuePair(null,null)
Expand All @@ -45,7 +47,8 @@ type SingleThreadSynchronizationContext(loop, runOnThisThreadHandler) =

member this.init() =
this.uv_handle_cb <- uv_handle_cb(this.runOnCurrentThread)
uv_async_init(loop,runOnThisThreadHandler,this.uv_handle_cb) |> checkStatus
this.running <- true
uv_async_init(loop, runOnThisThreadHandler, this.uv_handle_cb) |> checkStatus

open Suave.Sockets
open Suave.Utils.Async
Expand Down Expand Up @@ -73,7 +76,7 @@ type ReadOp() =
else
this.ok (Choice1Of2 nread)

member this.allocBuffer (_ : IntPtr, suggested_size: int, [<Out>] buff : byref<uv_buf_t>) =
member this.allocBuffer (_ : IntPtr, suggested_size : int, [<Out>] buff : byref<uv_buf_t>) =
if isWinNT then
buff.``base`` <- IntPtr(this.buf.Count)
buff.len <- Marshal.UnsafeAddrOfPinnedArrayElement(this.buf.Array,this.buf.Offset)
Expand Down Expand Up @@ -123,10 +126,15 @@ type OperationPair = ReadOp*WriteOp

open Suave.Logging

type LibUvTransport(pool : ConcurrentPool<OperationPair>,loop : IntPtr,client : Handle, synchronizationContext : SingleThreadSynchronizationContext,logger : Logger) =
type LibUvTransport(pool : ConcurrentPool<OperationPair>,
loop : IntPtr,
client : Handle,
synchronizationContext : SingleThreadSynchronizationContext,
logger : Logger) =

[<DefaultValue>] val mutable uv_close_cb : uv_close_cb
[<DefaultValue>] val mutable cont : unit -> unit
[<DefaultValue>] val mutable pin : GCHandle

let (readOp,writeOp) = pool.Pop()

Expand All @@ -141,8 +149,9 @@ type LibUvTransport(pool : ConcurrentPool<OperationPair>,loop : IntPtr,client :

member this.initialize() =
this.uv_close_cb <- uv_close_cb(this.closeCallback)
this.pin <- GCHandle.Alloc(this)

member this.shutdown() =
member this.shutdown () =
Async.FromContinuations <| fun (ok, _, _) ->
synchronizationContext.Post(SendOrPostCallback(fun o -> this.close ok),null)
synchronizationContext.Send()
Expand All @@ -155,8 +164,14 @@ type LibUvTransport(pool : ConcurrentPool<OperationPair>,loop : IntPtr,client :

member this.write(buf : ByteSegment) =
Async.FromContinuations <| fun (ok, _, _) ->
synchronizationContext.Post(SendOrPostCallback(fun o -> writeOp.writeStart client buf ok),null)
synchronizationContext.Send()
synchronizationContext.Post(SendOrPostCallback(fun o -> writeOp.writeStart client buf ok),null)
synchronizationContext.Send()

member this.shutdown() = async{
do! this.shutdown()
do this.pin.Free()
}


let createLibUvOpsPool maxOps =

Expand Down Expand Up @@ -195,10 +210,17 @@ open Suave.Types
open Suave.Tcp
open Suave

type LibUvSocket(pool : ConcurrentPool<OperationPair>,logger, serveClient, ip, loop , bufferManager, startData, acceptingConnections: AsyncResultCell<StartedData>,synchronizationContextCallback) =
type LibUvSocket(pool : ConcurrentPool<OperationPair>,
logger,
serveClient,
ip,
loop,
bufferManager,
startData,
acceptingConnections: AsyncResultCell<StartedData>,
synchronizationContext) =

[<DefaultValue>] val mutable uv_connection_cb : uv_connection_cb
[<DefaultValue>] val mutable synchronizationContext : SingleThreadSynchronizationContext

member this.onNewConnection (server : IntPtr) (status: int) =

Expand All @@ -211,17 +233,14 @@ type LibUvSocket(pool : ConcurrentPool<OperationPair>,logger, serveClient, ip, l
uv_tcp_init(loop, client) |> checkStatus

if (uv_accept(server, client) = 0) then
let transport = new LibUvTransport(pool,loop,client,this.synchronizationContext,logger)
let transport = new LibUvTransport(pool,loop,client,synchronizationContext,logger)
transport.initialize()
Async.Start <|
job logger serveClient ip transport bufferManager (transport.shutdown())
Async.Start (job logger serveClient ip transport bufferManager)
else
destroyHandle client

member this.initialize() =
this.uv_connection_cb <- uv_connection_cb(this.onNewConnection)
this.synchronizationContext <- new SingleThreadSynchronizationContext(loop, synchronizationContextCallback)
this.synchronizationContext.init()

member this.run(server) =

Expand Down Expand Up @@ -254,10 +273,12 @@ type LibUvServer(maxConcurrentOps, bufferManager, logger : Logger,
event : ManualResetEvent) =

[<DefaultValue>] val mutable thread : Thread
[<DefaultValue>] val mutable synchronizationContext : SingleThreadSynchronizationContext
[<DefaultValue>] val mutable uv_async_stop_loop_cb : uv_handle_cb
[<DefaultValue>] val mutable uv_close_cb_destroy : uv_close_cb
[<DefaultValue>] val mutable uv_close_cb_thread : uv_close_cb
[<DefaultValue>] val mutable uv_close_cb_loop : uv_close_cb
[<DefaultValue>] val mutable uv_close_cb_handler : uv_close_cb
[<DefaultValue>] val mutable uv_walk_cb : uv_walk_cb

let mutable addr = sockaddr_in( a = 0L, b= 0L, c = 0L, d = 0L)
Expand All @@ -268,7 +289,7 @@ type LibUvServer(maxConcurrentOps, bufferManager, logger : Logger,
let ip = binding.ip.ToString()
let port = int binding.port

let stopLoopCallback = createHandle <| uv_handle_size(uv_handle_type.UV_ASYNC)
let stopLoopCallbackHandle = createHandle <| uv_handle_size(uv_handle_type.UV_ASYNC)
let synchronizationContextCallback = createHandle <| uv_handle_size(uv_handle_type.UV_ASYNC)

let closeEvent = new ManualResetEvent(false)
Expand All @@ -284,12 +305,13 @@ type LibUvServer(maxConcurrentOps, bufferManager, logger : Logger,
uv_tcp_bind(server, &addr, 0) |> checkStatus
let s = new LibUvSocket(opsPool, logger, serveClient, binding, loop,
bufferManager, startData, acceptingConnections,
synchronizationContextCallback)
this.synchronizationContext)
s.initialize()
s.run(server)
s.exit()
with ex ->
Log.infoe logger "Suave.LibUv.Tcp.LibUvServer.run" TraceHeader.empty ex "could not start LibUvSocket"
closeEvent.WaitOne() |> ignore
destroyHandle loop
event.Set() |> ignore
Log.info logger "Suave.LibUv.Tcp.LibUvServer.run" TraceHeader.empty "exiting server."
Expand All @@ -298,11 +320,11 @@ type LibUvServer(maxConcurrentOps, bufferManager, logger : Logger,
Marshal.FreeCoTaskMem handle

member this.closeHandler (handle : IntPtr) (arg : IntPtr) =
uv_close(handle,uv_close_cb(this.closeHandlerCallback))
uv_close(handle,this.uv_close_cb_handler)

member this.uv_stop_loop (_ : IntPtr) =
member this.stopLoopCallback (_ : IntPtr) =
Log.info logger "Suave.LibUv.Tcp.LibUvServer.uv_stop_loop" TraceHeader.empty "-->"
uv_close(stopLoopCallback, this.uv_close_cb_loop)
uv_close(stopLoopCallbackHandle, this.uv_close_cb_loop)
Log.info logger "Suave.LibUv.Tcp.LibUvServer.uv_stop_loop" TraceHeader.empty "<--"

member this.init() =
Expand All @@ -311,19 +333,22 @@ type LibUvServer(maxConcurrentOps, bufferManager, logger : Logger,
this.uv_close_cb_destroy <- uv_close_cb(this.destroyServerCallback)
this.uv_close_cb_thread <- uv_close_cb(this.destroyRunOnThisThreadCallback)
this.uv_close_cb_loop <- uv_close_cb(this.destroyLoopCallback)
this.uv_async_stop_loop_cb <- uv_handle_cb(this.uv_stop_loop)
this.uv_async_stop_loop_cb <- uv_handle_cb(this.stopLoopCallback)
this.uv_close_cb_handler <- uv_close_cb(this.closeHandlerCallback)
this.uv_walk_cb <- uv_walk_cb(this.closeHandler)

member this.initLoop () =
uv_loop_init(loop) |> checkStatus
uv_async_init(loop, stopLoopCallback, this.uv_async_stop_loop_cb) |> checkStatus
uv_async_init(loop, stopLoopCallbackHandle, this.uv_async_stop_loop_cb) |> checkStatus
this.synchronizationContext <- new SingleThreadSynchronizationContext(loop, synchronizationContextCallback)
this.synchronizationContext.init()

member this.start() =
this.thread.Start()

member this.stopLoop() =
Log.info logger "Suave.LibUv.Tcp.LibUvServer.stopLoop" TraceHeader.empty "-->"
uv_async_send (stopLoopCallback) |> checkStatus
uv_async_send stopLoopCallbackHandle |> checkStatus
closeEvent.WaitOne() |> ignore
Log.info logger "Suave.LibUv.Tcp.LibUvServer.stopLoop" TraceHeader.empty "<--"

Expand All @@ -342,7 +367,8 @@ type LibUvServer(maxConcurrentOps, bufferManager, logger : Logger,

member private this.destroyLoopCallback _ =
Log.info logger "Suave.LibUv.Tcp.LibUvServer.destroyLoopCallback" TraceHeader.empty "-->"
destroyHandle stopLoopCallback
destroyHandle stopLoopCallbackHandle
this.synchronizationContext.running <- false
uv_close(synchronizationContextCallback, this.uv_close_cb_thread)
Log.info logger "Suave.LibUv.Tcp.LibUvServer.destroyLoopCallback" TraceHeader.empty "<--"

Expand All @@ -352,7 +378,8 @@ let runServerLibUv logger maxConcurrentOps bufferSize (binding: SocketBinding) s

let exitEvent = new ManualResetEvent(false)

let libUvServer = new LibUvServer(maxConcurrentOps, bufferManager, logger, binding, startData, serveClient, acceptingConnections, exitEvent)
let libUvServer = new LibUvServer(maxConcurrentOps, bufferManager,
logger, binding, startData, serveClient, acceptingConnections, exitEvent)

libUvServer.init()
async {
Expand Down
2 changes: 2 additions & 0 deletions src/Suave.OpenSSL/Provider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type TlsTransport(cn : Connection, ssl) =
interface ITransport with
member this.read (buf : ByteSegment) = sslReceive cn ssl buf
member this.write(buf : ByteSegment) = sslSend cn ssl buf
member this.shutdown() =
cn.transport.shutdown()

type OpenSSLProvider(cert : X509Certificate) =

Expand Down
Empty file modified src/Suave.Razor/Library.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/Auth.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/HttpVerbs.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/HttpWriters.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/Program.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/Proxy.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/Razor.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/Sscanf.fs
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/embedded-resource.txt
100755 → 100644
Empty file.
Empty file modified src/Suave.Tests/request-hangs.txt
100755 → 100644
Empty file.
Empty file modified src/Suave/Http.fs
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion src/Suave/Sockets/Connection.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module Connection =

let empty : Connection =
{ socketBinding = SocketBinding.mk IPAddress.IPv6Loopback 8083us
transport = { socket = null; readArgs = null; writeArgs = null }
transport = null
bufferManager = null
lineBuffer = ArraySegment<byte>()
segments = [] }
Expand Down
4 changes: 3 additions & 1 deletion src/Suave/Sockets/ITransport.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace Suave.Sockets

[<AllowNullLiteral>]
type ITransport =
abstract member read : ByteSegment -> SocketOp<int>
abstract member write : ByteSegment -> SocketOp<unit>
abstract member write : ByteSegment -> SocketOp<unit>
abstract member shutdown : unit -> Async<unit>
33 changes: 26 additions & 7 deletions src/Suave/Sockets/TcpTransport.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,32 @@
open System
open System.Net.Sockets

type TcpTransport =
{ socket : Socket
readArgs : SocketAsyncEventArgs
writeArgs : SocketAsyncEventArgs
}
type TcpTransport(acceptArgs: SocketAsyncEventArgs,
acceptArgsPool: ConcurrentPool<SocketAsyncEventArgs>,
readArgsPool : ConcurrentPool<SocketAsyncEventArgs>,
writeArgsPool : ConcurrentPool<SocketAsyncEventArgs>) =
let socket = acceptArgs.AcceptSocket
let readArgs = readArgsPool.Pop()
let writeArgs = writeArgsPool.Pop()
let shutdownSocket _ =
try
if socket <> null then
try
socket.Shutdown(SocketShutdown.Both)
with _ -> ()
socket.Close ()
socket.Dispose ()
with _ -> ()
interface ITransport with
member this.read(buf : ByteSegment) =
asyncDo this.socket.ReceiveAsync (setBuffer buf) (fun a -> a.BytesTransferred) this.readArgs
asyncDo socket.ReceiveAsync (setBuffer buf) (fun a -> a.BytesTransferred) readArgs
member this.write(buf : ByteSegment) =
asyncDo this.socket.SendAsync (setBuffer buf) ignore this.writeArgs
asyncDo socket.SendAsync (setBuffer buf) ignore writeArgs
member this.shutdown() = async {
shutdownSocket ()
acceptArgs.AcceptSocket <- null
acceptArgsPool.Push acceptArgs
readArgsPool.Push readArgs
writeArgsPool.Push writeArgs
return ()
}
19 changes: 5 additions & 14 deletions src/Suave/Tcp.fs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ let job logger
(serveClient : TcpWorker<unit>)
binding
(transport : ITransport)
(bufferManager : BufferManager)
(shutdownTransport: Async<unit>) = async {
(bufferManager : BufferManager) = async {
let intern = Log.intern logger "Suave.Tcp.job"
Interlocked.Increment Globals.numberOfClients |> ignore
intern (binding.ip.ToString() + " connected, total: " + (!Globals.numberOfClients).ToString() + " clients")
Expand All @@ -118,7 +117,7 @@ let job logger
}
try
use! oo = Async.OnCancel (fun () -> intern "disconnected client (async cancel)"
Async.RunSynchronously shutdownTransport)
Async.RunSynchronously (transport.shutdown()))
do! serveClient connection
with
| :? System.IO.EndOfStreamException ->
Expand All @@ -131,7 +130,7 @@ let job logger
"tcp request processing failed"
bufferManager.FreeBuffer(connection.lineBuffer, "Suave.Tcp.job") // buf free OK
intern "Shutting down transport."
do! shutdownTransport
do! transport.shutdown()
Interlocked.Decrement(Globals.numberOfClients) |> ignore
intern (binding.ip.ToString() + " disconnected, total: " + (!Globals.numberOfClients).ToString() + " clients")
}
Expand Down Expand Up @@ -172,16 +171,8 @@ let runServer logger maxConcurrentOps bufferSize (binding: SocketBinding) startD
let remoteBinding =
let rep = socket.RemoteEndPoint :?> IPEndPoint
{ ip = rep.Address; port = uint16 rep.Port }
let readArgs = b.Pop()
let writeArgs = c.Pop()
let transport = { socket = socket; readArgs = readArgs; writeArgs = writeArgs }
let shutdownTransport _ =
shutdownSocket socket
acceptArgs.AcceptSocket <- null
a.Push acceptArgs
b.Push readArgs
c.Push writeArgs
Async.Start (job logger serveClient remoteBinding transport bufferManager (async { do shutdownTransport()}), token)
let transport = new TcpTransport(acceptArgs, a, b, c)
Async.Start (job logger serveClient remoteBinding transport bufferManager, token)
| Choice2Of2 e -> failwith "failed to accept."
with ex -> "failed to accept a client" |> Log.interne logger "Suave.Tcp.tcpIpServer" ex
return ()
Expand Down
Empty file modified src/Suave/Web.fs
100755 → 100644
Empty file.
Empty file modified src/Suave/WebSocket.fs
100755 → 100644
Empty file.

0 comments on commit 82acd5c

Please sign in to comment.