Skip to content

Commit

Permalink
Pass around and use StreamWriter for replies.
Browse files Browse the repository at this point in the history
  • Loading branch information
PremekPaska committed Mar 16, 2021
1 parent b2a1770 commit c4cf3d9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
53 changes: 28 additions & 25 deletions Source/Ivxr.PlugIndependentLib/Communication/PluginServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private void Serve(TcpListener listener)
try
{
stream.ReadTimeout = ReadTimeoutMs;
ServeConnectedClient(stream);
ServeConnectedClient(stream, client.ReceiveBufferSize);
}
catch (IOException e)
{
Expand All @@ -90,32 +90,38 @@ private void Serve(TcpListener listener)
}
}

private void ServeConnectedClient(NetworkStream stream)
private void ServeConnectedClient(NetworkStream stream, int bufferSize)
{
using (var reader = new StreamReader(stream, Encoding.ASCII))
var encoding = Encoding.UTF8;

using (var reader = new StreamReader(stream, encoding, false, bufferSize))
using (var writer = new StreamWriter(stream, encoding, 4 * bufferSize)) // Replies are bigger.
{
writer.AutoFlush = true;

string message;
while ((message = reader.ReadLine()) != null)
{
m_log.WriteLine($"Read message: {message}");

ProcessMessage(stream, message, out bool disconnected);
if (disconnected)
ProcessMessage(stream, writer, message, out bool disconnect);
if (disconnect)
break;
}
}
}

private void ProcessMessage(NetworkStream clientStream, string message, out bool disconnected)
private void ProcessMessage(NetworkStream clientStream, StreamWriter writer, string message,
out bool disconnect)
{
disconnected = false;
disconnect = false;

if (!message.StartsWith("{\"Cmd\":"))
{
// throw new InvalidDataException("Unexpected message header: " + message);
m_log.WriteLine("Unexpected message header: " + message);
// We disconnect here, because the outer loop can't handle a request without a reply from the queue.
Disconnect(clientStream, out disconnected, reply: false);
FlagDisconnect(writer, out disconnect, reply: false);
return;
}

Expand All @@ -124,57 +130,54 @@ private void ProcessMessage(NetworkStream clientStream, string message, out bool
// ReSharper disable once StringLiteralTypo
if (command.StartsWith("\"AGENTCOM")) // AGENTCOMMAND
{
m_requestQueue.Requests.Enqueue(new RequestItem(clientStream, message));
m_requestQueue.Requests.Enqueue(new RequestItem(writer, message));

// TODO(PP): This tends to block when the message is not added to the queue. Rearchitecture, or at least add a timeout.
WaitForReplyAndSendIt();
}
else if (command.StartsWith("\"SESSION\""))
{
m_sessionDispatcher.ProcessRequest(new RequestItem(clientStream, message));
m_sessionDispatcher.ProcessRequest(new RequestItem(writer, message));
}
else if (command.StartsWith("\"DISCONNECT\""))
{
Disconnect(clientStream, out disconnected, reply: true);
FlagDisconnect(writer, out disconnect, reply: true);
}
else
{
// throw new NotImplementedException("Command unknown or not implemented: " + command);
m_log.WriteLine("Command unknown or not implemented: " + command);
// We disconnect here, because the outer loop can't handle a request without a reply from the queue.
Disconnect(clientStream, out disconnected, reply: false);
FlagDisconnect(writer, out disconnect, reply: false);
}
}

private void Disconnect(NetworkStream clientStream, out bool disconnected, bool reply = true)
private void FlagDisconnect(StreamWriter writer, out bool disconnect, bool reply = true)
{
Reply(clientStream, reply.ToString());
clientStream.Close(timeout: 100); // ms
disconnected = true;
Reply(writer, reply.ToString());
disconnect = true;
}

private void WaitForReplyAndSendIt()
{
// TODO(PP): consider adding a timeout (blocks when no reply ready)
var reply = m_requestQueue.Replies.Take();
Reply(reply.ClientStream, reply.Message);
Reply(reply.ClientStreamWriter, reply.Message);
}

public static void Reply(NetworkStream clientStream, string reply)
public static void Reply(StreamWriter writer, string reply)
{
// TODO(PP): avoid allocation of a new buffer each time
var replyBuffer = Encoding.ASCII.GetBytes(reply + '\n');
clientStream.Write(replyBuffer, 0, replyBuffer.Length);
writer.WriteLine(reply);
}

public static void ReplyOK(NetworkStream clientStream)
public static void ReplyOK(StreamWriter clientStreamWriter)
{
Reply(clientStream, "true");
Reply(clientStreamWriter, "true");
}

public static void ReplyFalse(NetworkStream clientStream)
public static void ReplyFalse(StreamWriter clientStreamWriter)
{
Reply(clientStream, "false");
Reply(clientStreamWriter, "false");
}
}
}
11 changes: 6 additions & 5 deletions Source/Ivxr.PlugIndependentLib/Communication/RequestItem.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
using System.Linq;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text.RegularExpressions;

namespace Iv4xr.PluginLib
{
/// <summary>
/// Request item of the request queue, carring its context.
/// Request item of the request queue, carrying its context.
/// </summary>
public class RequestItem
{
public RequestItem(NetworkStream clientStream, string message)
public RequestItem(StreamWriter clientStreamWriter, string message)
{
ClientStream = clientStream;
ClientStreamWriter = clientStreamWriter;
Message = message;
}

public NetworkStream ClientStream { get; }
public StreamWriter ClientStreamWriter { get; }
public string Message { get; } // Maybe we'll add something more structured later.

public string GetCmd()
Expand Down
2 changes: 1 addition & 1 deletion Source/Ivxr.SePlugin/Communication/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void ProcessRequests()
}

m_requestQueue.Replies.Add(
new RequestItem(request.ClientStream, message: jsonReply));
new RequestItem(request.ClientStreamWriter, message: jsonReply));
}
}

Expand Down
4 changes: 2 additions & 2 deletions Source/Ivxr.SePlugin/Session/SessionDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public void ProcessRequest(RequestItem request)
var requestShell = m_jsoner.ToObject<SeRequestShell<SessionCommand>>(request.Message);

m_sessionController.LoadScenario(requestShell.Arg.ScenarioPath);
PluginServer.ReplyOK(request.ClientStream);
PluginServer.ReplyOK(request.ClientStreamWriter);
}
catch (Exception ex)
{
Log.Exception(ex, "Error processing a request");
Log.WriteLine($"Full request: \"{request.Message}\"");
PluginServer.ReplyFalse(request
.ClientStream); // Simple error response, details can be learned from the log.
.ClientStreamWriter); // Simple error response, details can be learned from the log.
}
}
}
Expand Down

0 comments on commit c4cf3d9

Please sign in to comment.