diff --git a/src/net45/Extensions/WampSharp.Owin/Owin/OwinWebSocketWrapper.cs b/src/net45/Extensions/WampSharp.Owin/Owin/OwinWebSocketWrapper.cs index 3f9433e30..538d85d6c 100644 --- a/src/net45/Extensions/WampSharp.Owin/Owin/OwinWebSocketWrapper.cs +++ b/src/net45/Extensions/WampSharp.Owin/Owin/OwinWebSocketWrapper.cs @@ -13,6 +13,7 @@ internal class OwinWebSocketWrapper : IWebSocketWrapper private readonly Func, int, bool, CancellationToken, Task> mSendAsync; private readonly Func, CancellationToken, Task>> mReceiveAsync; private readonly Func mCloseAsync; + private WebSocketState mState = WebSocketState.Open; private const string WebSocketSendAsync = "websocket.SendAsync"; private const string WebSocketReceiveAsync = "websocket.ReceiveAsync"; @@ -95,7 +96,29 @@ public async Task ReceiveAsync await mReceiveAsync(arraySegment, callCancelled) .ConfigureAwait(false); - return new WebSocketReceiveResult(count: result.Item3, messageType: GetMessageType(result.Item1), endOfMessage: result.Item2); + WebSocketMessageType webSocketMessageType = GetMessageType(result.Item1); + + if (webSocketMessageType == WebSocketMessageType.Close) + { + ChangeState(actionDone: WebSocketState.CloseReceived, + dualAction: WebSocketState.CloseSent); + + return new WebSocketReceiveResult(count: result.Item3, + messageType: webSocketMessageType, + endOfMessage: result.Item2, + closeStatus: this.ClientCloseStatus, + closeStatusDescription: WebSocketClientCloseDescription); + } + + return new WebSocketReceiveResult(count: result.Item3, messageType: webSocketMessageType, endOfMessage: result.Item2); + } + + public WebSocketState State + { + get + { + return mState; + } } public Task SendAsync(ArraySegment data, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancel) @@ -105,16 +128,21 @@ public Task SendAsync(ArraySegment data, WebSocketMessageType messageType, public Task CloseAsync(WebSocketCloseStatus closeStatus, string closeDescription, CancellationToken cancel) { + ChangeState(actionDone: WebSocketState.CloseSent, + dualAction: WebSocketState.CloseReceived); + return mCloseAsync((int) closeStatus, closeDescription, cancel); } - public bool IsConnected + private void ChangeState(WebSocketState actionDone, WebSocketState dualAction) { - get + if (State == WebSocketState.Open) { - WebSocketCloseStatus? closeStatus = ClientCloseStatus; - - return ((closeStatus == null) || (closeStatus == 0)); + mState = actionDone; + } + else if (mState == dualAction) + { + mState = WebSocketState.Closed; } } diff --git a/src/net45/Extensions/WampSharp.Owin/WampSharp.Owin.project.json b/src/net45/Extensions/WampSharp.Owin/WampSharp.Owin.project.json index faf891318..4d1a162a5 100644 --- a/src/net45/Extensions/WampSharp.Owin/WampSharp.Owin.project.json +++ b/src/net45/Extensions/WampSharp.Owin/WampSharp.Owin.project.json @@ -1,8 +1,11 @@ -{ +{ "frameworks": { "net45": {} }, "runtimes": { "win": {} + }, + "dependencies": { + "Microsoft.Owin": "3.0.1" } } \ No newline at end of file diff --git a/src/net45/Extensions/WampSharp.WebSockets/WebSockets/WebSocketWrapperConnection.cs b/src/net45/Extensions/WampSharp.WebSockets/WebSockets/WebSocketWrapperConnection.cs index 4d37a2c01..45682a26d 100644 --- a/src/net45/Extensions/WampSharp.WebSockets/WebSockets/WebSocketWrapperConnection.cs +++ b/src/net45/Extensions/WampSharp.WebSockets/WebSockets/WebSocketWrapperConnection.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using WampSharp.Core.Listener; using WampSharp.Core.Message; +using WampSharp.Logging; using WampSharp.V2.Authentication; using WampSharp.V2.Binding.Parsers; @@ -16,8 +17,9 @@ public abstract class WebSocketWrapperConnection : AsyncWebSocketWampC { private readonly IWampStreamingMessageParser mParser; private readonly IWebSocketWrapper mWebSocket; - private readonly CancellationTokenSource mCancellationTokenSource; + private CancellationTokenSource mCancellationTokenSource; private readonly Uri mAddressUri; + private CancellationToken mCancellationToken; public WebSocketWrapperConnection(IWebSocketWrapper webSocket, IWampStreamingMessageParser parser, ICookieProvider cookieProvider, ICookieAuthenticatorFactory cookieAuthenticatorFactory) : base(cookieProvider, cookieAuthenticatorFactory) @@ -25,6 +27,7 @@ public WebSocketWrapperConnection(IWebSocketWrapper webSocket, IWampStreamingMes mWebSocket = webSocket; mParser = parser; mCancellationTokenSource = new CancellationTokenSource(); + mCancellationToken = mCancellationTokenSource.Token; } protected WebSocketWrapperConnection(IClientWebSocketWrapper clientWebSocket, Uri addressUri, string protocolName, IWampStreamingMessageParser parser) : @@ -36,8 +39,9 @@ protected WebSocketWrapperConnection(IClientWebSocketWrapper clientWebSocket, Ur protected override Task SendAsync(WampMessage message) { + mLogger.Debug("Attempting to send a message"); ArraySegment messageToSend = GetMessageInBytes(message); - return mWebSocket.SendAsync(messageToSend, WebSocketMessageType, true, mCancellationTokenSource.Token); + return mWebSocket.SendAsync(messageToSend, WebSocketMessageType, true, mCancellationToken); } protected abstract ArraySegment GetMessageInBytes(WampMessage message); @@ -48,12 +52,12 @@ protected async void Connect() { try { - await this.ClientWebSocket.ConnectAsync(mAddressUri, mCancellationTokenSource.Token) + await this.ClientWebSocket.ConnectAsync(mAddressUri, mCancellationToken) .ConfigureAwait(false); RaiseConnectionOpen(); - Task task = Task.Run((Func) this.RunAsync, mCancellationTokenSource.Token); + Task task = Task.Run((Func) this.RunAsync, mCancellationToken); } catch (Exception ex) { @@ -87,36 +91,13 @@ data is very small. MemoryStream memoryStream = new MemoryStream(); // Checks WebSocket state. - while (mWebSocket.IsConnected) + while (IsConnected && !mCancellationToken.IsCancellationRequested) { // Reads data. - WebSocketReceiveResult webSocketReceiveResult; + WebSocketReceiveResult webSocketReceiveResult = + await ReadMessage(receivedDataBuffer, memoryStream); - long length = 0; - do - { - webSocketReceiveResult = - await mWebSocket.ReceiveAsync(receivedDataBuffer, mCancellationTokenSource.Token) - .ConfigureAwait(false); - - length += webSocketReceiveResult.Count; - - await memoryStream.WriteAsync(receivedDataBuffer.Array, receivedDataBuffer.Offset, - webSocketReceiveResult.Count, mCancellationTokenSource.Token) - .ConfigureAwait(false); - - } while (!webSocketReceiveResult.EndOfMessage); - - // If input frame is cancelation frame, send close command. - if (webSocketReceiveResult.MessageType == WebSocketMessageType.Close) - { - this.RaiseConnectionClosed(); - - await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, - String.Empty, mCancellationTokenSource.Token) - .ConfigureAwait(false); - } - else + if (webSocketReceiveResult.MessageType != WebSocketMessageType.Close) { memoryStream.Position = 0; OnNewMessage(memoryStream); @@ -128,9 +109,62 @@ await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, } catch (Exception ex) { - RaiseConnectionError(ex); - RaiseConnectionClosed(); + // Cancellation token could be cancelled in Dispose if a + // Goodbye message has been received. + if (!(ex is OperationCanceledException) || + !mCancellationToken.IsCancellationRequested) + { + RaiseConnectionError(ex); + } } + + if (mWebSocket.State != WebSocketState.CloseReceived && + mWebSocket.State != WebSocketState.Closed) + { + await CloseWebSocket().ConfigureAwait(false); + } + + RaiseConnectionClosed(); + } + + private async Task CloseWebSocket() + { + try + { + await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, + String.Empty, + CancellationToken.None) + .ConfigureAwait(false); + } + catch (Exception ex) + { + mLogger.WarnException("Failed sending a close message to client", ex); + } + } + + private async Task ReadMessage(ArraySegment receivedDataBuffer, MemoryStream memoryStream) + { + WebSocketReceiveResult webSocketReceiveResult; + + long length = 0; + + do + { + webSocketReceiveResult = + await mWebSocket.ReceiveAsync(receivedDataBuffer, mCancellationToken) + .ConfigureAwait(false); + + length += webSocketReceiveResult.Count; + + await memoryStream.WriteAsync(receivedDataBuffer.Array, + receivedDataBuffer.Offset, + webSocketReceiveResult.Count, + mCancellationToken) + .ConfigureAwait(false); + } + while (!webSocketReceiveResult.EndOfMessage); + + return webSocketReceiveResult; } private void OnNewMessage(MemoryStream payloadData) @@ -142,14 +176,16 @@ private void OnNewMessage(MemoryStream payloadData) protected override void Dispose() { mCancellationTokenSource.Cancel(); + mCancellationTokenSource.Dispose(); + mCancellationTokenSource = null; } protected override bool IsConnected { get { - return mWebSocket.IsConnected; + return mWebSocket.State == WebSocketState.Open; } } } -} \ No newline at end of file +} diff --git a/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/IWebSocketWrapper.cs b/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/IWebSocketWrapper.cs index 5d448db06..ac819a986 100644 --- a/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/IWebSocketWrapper.cs +++ b/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/IWebSocketWrapper.cs @@ -7,7 +7,7 @@ namespace WampSharp.WebSockets { public interface IWebSocketWrapper { - bool IsConnected { get; } + WebSocketState State { get; } Task ReceiveAsync(ArraySegment arraySegment, CancellationToken callCancelled); Task SendAsync(ArraySegment data, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancel); Task CloseAsync(WebSocketCloseStatus closeStatus, string closeDescription, CancellationToken cancel); diff --git a/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/WebSocketWrapper.cs b/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/WebSocketWrapper.cs index 0d9c43723..b0bc6e1f7 100644 --- a/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/WebSocketWrapper.cs +++ b/src/net45/Extensions/WampSharp.WebSockets/WebSockets/Wrapper/WebSocketWrapper.cs @@ -14,14 +14,6 @@ public WebSocketWrapper(WebSocket webSocket) mWebSocket = webSocket; } - public bool IsConnected - { - get - { - return mWebSocket.State == WebSocketState.Open; - } - } - public Task ReceiveAsync(ArraySegment arraySegment, CancellationToken callCancelled) { return mWebSocket.ReceiveAsync(arraySegment, callCancelled); @@ -36,5 +28,10 @@ public Task CloseAsync(WebSocketCloseStatus closeStatus, string closeDescription { return mWebSocket.CloseAsync(closeStatus, closeDescription, cancel); } + + public WebSocketState State + { + get { return mWebSocket.State; } + } } } \ No newline at end of file diff --git a/src/net45/WampSharp/Core/Listener/Connections/AsyncConnection/AsyncWampConnection.cs b/src/net45/WampSharp/Core/Listener/Connections/AsyncConnection/AsyncWampConnection.cs index 67b64ed7d..67869ce09 100644 --- a/src/net45/WampSharp/Core/Listener/Connections/AsyncConnection/AsyncWampConnection.cs +++ b/src/net45/WampSharp/Core/Listener/Connections/AsyncConnection/AsyncWampConnection.cs @@ -12,10 +12,11 @@ public abstract class AsyncWampConnection : IWampConnection, { private readonly ActionBlock> mSendBlock; protected readonly ILog mLogger; + private int mDisposeCalled = 0; protected AsyncWampConnection() { - mLogger = LogProvider.GetLogger(this.GetType()); + mLogger = new LoggerWithConnectionId(LogProvider.GetLogger(this.GetType())); mSendBlock = new ActionBlock>(x => InnerSend(x)); } @@ -112,6 +113,7 @@ protected virtual void RaiseMessageArrived(WampMessage message) protected virtual void RaiseConnectionClosed() { + mLogger.Debug("Connection has been closed"); var handler = ConnectionClosed; if (handler != null) handler(this, EventArgs.Empty); } @@ -125,9 +127,12 @@ protected virtual void RaiseConnectionError(Exception ex) void IDisposable.Dispose() { - mSendBlock.Complete(); - mSendBlock.Completion.Wait(); - this.Dispose(); + if (Interlocked.CompareExchange(ref mDisposeCalled, 1, 0) == 0) + { + mSendBlock.Complete(); + mSendBlock.Completion.Wait(); + this.Dispose(); + } } protected abstract void Dispose(); @@ -136,9 +141,12 @@ void IDisposable.Dispose() async Task IAsyncDisposable.DisposeAsync() { - mSendBlock.Complete(); - await mSendBlock.Completion; - this.Dispose(); + if (Interlocked.CompareExchange(ref mDisposeCalled, 1, 0) == 0) + { + mSendBlock.Complete(); + await mSendBlock.Completion; + this.Dispose(); + } } #else @@ -151,5 +159,26 @@ Task IAsyncDisposable.DisposeAsync() #endif + // TODO: move this to another file (after making it more generic) + // TODO: or get rid of this. + private class LoggerWithConnectionId : ILog + { + private readonly ILog mLogger; + private readonly string mConnectionId; + + public LoggerWithConnectionId(ILog logger) + { + mConnectionId = Guid.NewGuid().ToString(); + mLogger = logger; + } + + public bool Log(LogLevel logLevel, Func messageFunc, Exception exception = null, params object[] formatParameters) + { + using (LogProvider.OpenMappedContext("ConncetionId", mConnectionId)) + { + return mLogger.Log(logLevel, messageFunc, exception, formatParameters); + } + } + } } } \ No newline at end of file