From c4cf3d9f3fc971abeb1216afb4dd4f929f4af04a Mon Sep 17 00:00:00 2001 From: Premysl Paska Date: Mon, 15 Mar 2021 17:58:56 +0100 Subject: [PATCH] Pass around and use StreamWriter for replies. --- .../Communication/PluginServer.cs | 53 ++++++++++--------- .../Communication/RequestItem.cs | 11 ++-- .../Ivxr.SePlugin/Communication/Dispatcher.cs | 2 +- .../Session/SessionDispatcher.cs | 4 +- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/Source/Ivxr.PlugIndependentLib/Communication/PluginServer.cs b/Source/Ivxr.PlugIndependentLib/Communication/PluginServer.cs index 6a9ba1315..a8a471e70 100644 --- a/Source/Ivxr.PlugIndependentLib/Communication/PluginServer.cs +++ b/Source/Ivxr.PlugIndependentLib/Communication/PluginServer.cs @@ -80,7 +80,7 @@ private void Serve(TcpListener listener) try { stream.ReadTimeout = ReadTimeoutMs; - ServeConnectedClient(stream); + ServeConnectedClient(stream, client.ReceiveBufferSize); } catch (IOException e) { @@ -90,32 +90,38 @@ private void Serve(TcpListener listener) } } - private void ServeConnectedClient(NetworkStream stream) + private void ServeConnectedClient(NetworkStream stream, int bufferSize) { - using (var reader = new StreamReader(stream, Encoding.ASCII)) + var encoding = Encoding.UTF8; + + using (var reader = new StreamReader(stream, encoding, false, bufferSize)) + using (var writer = new StreamWriter(stream, encoding, 4 * bufferSize)) // Replies are bigger. { + writer.AutoFlush = true; + string message; while ((message = reader.ReadLine()) != null) { m_log.WriteLine($"Read message: {message}"); - ProcessMessage(stream, message, out bool disconnected); - if (disconnected) + ProcessMessage(stream, writer, message, out bool disconnect); + if (disconnect) break; } } } - private void ProcessMessage(NetworkStream clientStream, string message, out bool disconnected) + private void ProcessMessage(NetworkStream clientStream, StreamWriter writer, string message, + out bool disconnect) { - disconnected = false; + disconnect = false; if (!message.StartsWith("{\"Cmd\":")) { // throw new InvalidDataException("Unexpected message header: " + message); m_log.WriteLine("Unexpected message header: " + message); // We disconnect here, because the outer loop can't handle a request without a reply from the queue. - Disconnect(clientStream, out disconnected, reply: false); + FlagDisconnect(writer, out disconnect, reply: false); return; } @@ -124,57 +130,54 @@ private void ProcessMessage(NetworkStream clientStream, string message, out bool // ReSharper disable once StringLiteralTypo if (command.StartsWith("\"AGENTCOM")) // AGENTCOMMAND { - m_requestQueue.Requests.Enqueue(new RequestItem(clientStream, message)); + m_requestQueue.Requests.Enqueue(new RequestItem(writer, message)); // TODO(PP): This tends to block when the message is not added to the queue. Rearchitecture, or at least add a timeout. WaitForReplyAndSendIt(); } else if (command.StartsWith("\"SESSION\"")) { - m_sessionDispatcher.ProcessRequest(new RequestItem(clientStream, message)); + m_sessionDispatcher.ProcessRequest(new RequestItem(writer, message)); } else if (command.StartsWith("\"DISCONNECT\"")) { - Disconnect(clientStream, out disconnected, reply: true); + FlagDisconnect(writer, out disconnect, reply: true); } else { // throw new NotImplementedException("Command unknown or not implemented: " + command); m_log.WriteLine("Command unknown or not implemented: " + command); // We disconnect here, because the outer loop can't handle a request without a reply from the queue. - Disconnect(clientStream, out disconnected, reply: false); + FlagDisconnect(writer, out disconnect, reply: false); } } - private void Disconnect(NetworkStream clientStream, out bool disconnected, bool reply = true) + private void FlagDisconnect(StreamWriter writer, out bool disconnect, bool reply = true) { - Reply(clientStream, reply.ToString()); - clientStream.Close(timeout: 100); // ms - disconnected = true; + Reply(writer, reply.ToString()); + disconnect = true; } private void WaitForReplyAndSendIt() { // TODO(PP): consider adding a timeout (blocks when no reply ready) var reply = m_requestQueue.Replies.Take(); - Reply(reply.ClientStream, reply.Message); + Reply(reply.ClientStreamWriter, reply.Message); } - public static void Reply(NetworkStream clientStream, string reply) + public static void Reply(StreamWriter writer, string reply) { - // TODO(PP): avoid allocation of a new buffer each time - var replyBuffer = Encoding.ASCII.GetBytes(reply + '\n'); - clientStream.Write(replyBuffer, 0, replyBuffer.Length); + writer.WriteLine(reply); } - public static void ReplyOK(NetworkStream clientStream) + public static void ReplyOK(StreamWriter clientStreamWriter) { - Reply(clientStream, "true"); + Reply(clientStreamWriter, "true"); } - public static void ReplyFalse(NetworkStream clientStream) + public static void ReplyFalse(StreamWriter clientStreamWriter) { - Reply(clientStream, "false"); + Reply(clientStreamWriter, "false"); } } } \ No newline at end of file diff --git a/Source/Ivxr.PlugIndependentLib/Communication/RequestItem.cs b/Source/Ivxr.PlugIndependentLib/Communication/RequestItem.cs index 4a8170b10..b7b44e6da 100644 --- a/Source/Ivxr.PlugIndependentLib/Communication/RequestItem.cs +++ b/Source/Ivxr.PlugIndependentLib/Communication/RequestItem.cs @@ -1,21 +1,22 @@ -using System.Linq; +using System.IO; +using System.Linq; using System.Net.Sockets; using System.Text.RegularExpressions; namespace Iv4xr.PluginLib { /// - /// Request item of the request queue, carring its context. + /// Request item of the request queue, carrying its context. /// public class RequestItem { - public RequestItem(NetworkStream clientStream, string message) + public RequestItem(StreamWriter clientStreamWriter, string message) { - ClientStream = clientStream; + ClientStreamWriter = clientStreamWriter; Message = message; } - public NetworkStream ClientStream { get; } + public StreamWriter ClientStreamWriter { get; } public string Message { get; } // Maybe we'll add something more structured later. public string GetCmd() diff --git a/Source/Ivxr.SePlugin/Communication/Dispatcher.cs b/Source/Ivxr.SePlugin/Communication/Dispatcher.cs index 133e5a0f9..1d98e73e0 100644 --- a/Source/Ivxr.SePlugin/Communication/Dispatcher.cs +++ b/Source/Ivxr.SePlugin/Communication/Dispatcher.cs @@ -70,7 +70,7 @@ public void ProcessRequests() } m_requestQueue.Replies.Add( - new RequestItem(request.ClientStream, message: jsonReply)); + new RequestItem(request.ClientStreamWriter, message: jsonReply)); } } diff --git a/Source/Ivxr.SePlugin/Session/SessionDispatcher.cs b/Source/Ivxr.SePlugin/Session/SessionDispatcher.cs index 0511b37c8..3a2ec77a3 100644 --- a/Source/Ivxr.SePlugin/Session/SessionDispatcher.cs +++ b/Source/Ivxr.SePlugin/Session/SessionDispatcher.cs @@ -27,14 +27,14 @@ public void ProcessRequest(RequestItem request) var requestShell = m_jsoner.ToObject>(request.Message); m_sessionController.LoadScenario(requestShell.Arg.ScenarioPath); - PluginServer.ReplyOK(request.ClientStream); + PluginServer.ReplyOK(request.ClientStreamWriter); } catch (Exception ex) { Log.Exception(ex, "Error processing a request"); Log.WriteLine($"Full request: \"{request.Message}\""); PluginServer.ReplyFalse(request - .ClientStream); // Simple error response, details can be learned from the log. + .ClientStreamWriter); // Simple error response, details can be learned from the log. } } }