Skip to content
This repository has been archived by the owner on Aug 22, 2023. It is now read-only.

Commit

Permalink
moved websocket data streaming to game thread, in order to avoid pote…
Browse files Browse the repository at this point in the history
…ntially crashing KSP on exit
  • Loading branch information
richardbunt committed Jul 4, 2015
1 parent 757c9ec commit 968ca67
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 42 deletions.
2 changes: 2 additions & 0 deletions Telemachus/Telemachus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@
<Compile Include="src\DataLinkResponsibility.cs" />
<Compile Include="src\ElseResponsibility.cs" />
<Compile Include="src\IOPageResponsibility.cs" />
<Compile Include="src\IterationToEvent.cs" />
<Compile Include="src\KSPWebSocketService.cs" />
<Compile Include="src\PluginLogger.cs" />
<Compile Include="src\TelemachusBehaviour.cs" />
<Compile Include="src\TelemachusPartModules.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="src\UpdateTimer.cs" />
<Compile Include="src\UpLinkDownLinkRate.cs" />
<Compile Include="src\VesselChangeDetector.cs" />
</ItemGroup>
Expand Down
22 changes: 22 additions & 0 deletions Telemachus/src/IterationToEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Timers;
using UnityEngine;

namespace Telemachus
{
public class IterationToEvent<E> where E : EventArgs
{
public event EventHandler<E> Iterated;

public void update(E eventArgs)
{
if (Iterated != null)
{
Iterated(this, eventArgs);
}
}
}
}
77 changes: 41 additions & 36 deletions Telemachus/src/KSPWebSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,46 @@ public class KSPWebSocketService : IWebSocketService

private Regex matchJSONAttributes = new Regex(@"[\{""|,""|,""]([^"":]*)"":([^:]*)[,|\}]");

private Timer streamTimer = new Timer();
private IterationToEvent<UpdateTimerEventArgs> gameLoopEvent = null;
private UpdateTimer streamTimer = new UpdateTimer();

private int streamRate = 500;
HashSet<string> subscriptions = new HashSet<string>();
HashSet<string> toRun = new HashSet<string>();
readonly private System.Object subscriptionLock = new System.Object();

public KSPWebSocketService(IKSPAPI kspAPI, Servers.AsynchronousServer.ClientConnection clientConnection)
: this(kspAPI)
public KSPWebSocketService(IKSPAPI kspAPI, Servers.AsynchronousServer.ClientConnection clientConnection, IterationToEvent<UpdateTimerEventArgs> gameLoopEvent)
: this(kspAPI, gameLoopEvent)
{
this.clientConnection = clientConnection;
streamTimer.Interval = streamRate;
streamTimer.Elapsed += streamData;
streamTimer.Enabled = true;
}

public KSPWebSocketService(IKSPAPI kspAPI)
public KSPWebSocketService(IKSPAPI kspAPI, IterationToEvent<UpdateTimerEventArgs> gameLoopEvent)
{
this.kspAPI = kspAPI;
this.gameLoopEvent = gameLoopEvent;
gameLoopEvent.Iterated += streamTimer.update;
}

private void streamData(object sender, ElapsedEventArgs e)
private void streamData(object sender, UpdateTimerEventArgs e)
{
streamTimer.Interval = streamRate;

DataSources dataSources = new DataSources();

if (toRun.Count + subscriptions.Count > 0)
lock (subscriptionLock)
{
try
streamTimer.Interval = streamRate;

if (toRun.Count + subscriptions.Count > 0)
{
List<string> entries = new List<string>();
try
{
List<string> entries = new List<string>();

APIEntry entry = null;
APIEntry entry = null;

lock (subscriptionLock)
{
dataSources.vessel = kspAPI.getVessel();

//Only parse the paused argument if the active vessel is null
Expand Down Expand Up @@ -101,22 +104,22 @@ private void streamData(object sender, ElapsedEventArgs e)
{
sendNullMessage();
}
}
}
catch(NullReferenceException)
{
PluginLogger.debug("Swallowing null reference exception, potentially due to async game state change.");
sendNullMessage();
}
catch (Exception ex)
{
PluginLogger.debug("Closing socket due to potential client disconnect:" + ex.GetType().ToString());
close();
}
}
catch(NullReferenceException)
else
{
PluginLogger.debug("Swallowing null reference exception, potentially due to async game state change.");
sendNullMessage();
}
catch (Exception ex)
{
PluginLogger.debug("Closing socket due to potential client disconnect:" + ex.GetType().ToString());
close();
}
}
else
{
sendNullMessage();
}
}

Expand Down Expand Up @@ -159,20 +162,22 @@ public void OpCodeText(object sender, FrameEventArgs e)
private void rate(string p)
{
int proposedRate = 0;

try
lock (subscriptionLock)
{
proposedRate = int.Parse(p);
try
{
proposedRate = int.Parse(p);

if (proposedRate >= MAX_STREAM_RATE)
if (proposedRate >= MAX_STREAM_RATE)
{
streamRate = proposedRate;
}
}
catch (Exception)
{
streamRate = proposedRate;
PluginLogger.debug("Swallowing integer parse failure when setting stream rate.");
}
}
catch (Exception)
{
PluginLogger.debug("Swallowing integer parse failure when setting stream rate.");
}
}

private string[] splitString(string p)
Expand Down Expand Up @@ -228,13 +233,13 @@ public void Shutdown(EventArgs e)

private void close()
{
streamTimer.Stop();
gameLoopEvent.Iterated -= streamTimer.update;
clientConnection.tryShutdown();
}

public IWebSocketService buildService(Servers.AsynchronousServer.ClientConnection clientConnection)
{
return new KSPWebSocketService(kspAPI, clientConnection);
return new KSPWebSocketService(kspAPI, clientConnection, gameLoopEvent);
}

#region Unused Callbacks
Expand Down
12 changes: 11 additions & 1 deletion Telemachus/src/TelemachusBehaviour.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Timers;
using UnityEngine;

namespace Telemachus
{
class TelemachusBehaviour : MonoBehaviour
{
#region Constants

private float MICRO_SECONDS = 1000.0f;

#endregion

#region Fields

public static GameObject instance;
Expand All @@ -27,6 +34,7 @@ class TelemachusBehaviour : MonoBehaviour
private static IOPageResponsibility ioPageResponsibility = null;
private static VesselChangeDetector vesselChangeDetector = null;
private static KSPWebSocketService kspWebSocketService = null;
private static IterationToEvent<UpdateTimerEventArgs> kspWebSocketDataStreamer = new IterationToEvent<UpdateTimerEventArgs>();
private static bool isPartless = false;

static public string getServerPrimaryIPAddress()
Expand Down Expand Up @@ -64,7 +72,8 @@ static private void startDataLink()
webSocketconfig.bufferSize = 300;
webSocketServer = new Servers.MinimalWebSocketServer.Server(webSocketconfig);
webSocketServer.ServerNotify += WebSocketServerNotify;
kspWebSocketService = new KSPWebSocketService(new KSPAPI(JSONFormatterProvider.Instance, vesselChangeDetector, serverConfig));
kspWebSocketService = new KSPWebSocketService(new KSPAPI(JSONFormatterProvider.Instance, vesselChangeDetector, serverConfig),
kspWebSocketDataStreamer);
webSocketServer.addWebSocketService("/datalink", kspWebSocketService);
webSocketServer.subscribeToHTTPForStealing(server);

Expand Down Expand Up @@ -187,6 +196,7 @@ public void Update()
if (FlightGlobals.fetch != null)
{
vesselChangeDetector.update(FlightGlobals.ActiveVessel);
kspWebSocketDataStreamer.update(new UpdateTimerEventArgs(Time.time * MICRO_SECONDS));
}
else
{
Expand Down
57 changes: 57 additions & 0 deletions Telemachus/src/UpdateTimer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Timers;
using UnityEngine;

namespace Telemachus
{
public class UpdateTimer
{
public event EventHandler<UpdateTimerEventArgs> Elapsed;
private float lastEventTime = 0.0f;

private float interval = 0.0f;
public float Interval
{
get { return interval; }
set { interval = value; }
}

private bool enabled = false;
public bool Enabled
{
get { return enabled; }
set { enabled = value; }
}

public void update(object sender, UpdateTimerEventArgs timeElapsedEventArgs)
{
if ((timeElapsedEventArgs.Time - lastEventTime > interval) && Enabled)
{
if (Elapsed != null)
{
Elapsed(this, timeElapsedEventArgs);
}

lastEventTime = timeElapsedEventArgs.Time;
}
}
}

public class UpdateTimerEventArgs : EventArgs
{
private float time = 0.0f;

public float Time
{
get { return time; }
}

public UpdateTimerEventArgs(float time)
{
this.time = time;
}
}
}
10 changes: 5 additions & 5 deletions TelemachusTest/src/TelemachusWebSocketServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public void run()
VesselChangeDetector vesselChangeDetector = new VesselChangeDetector(false);
IKSPAPI kspAPI = new DummyKSPAPI(JSONFormatterProvider.Instance, vesselChangeDetector, config);

webSocketServer.addWebSocketService("/server", new KSPWebSocketService(kspAPI));
webSocketServer.subscribeToHTTPForStealing(server);
//webSocketServer.addWebSocketService("/server", new KSPWebSocketService(kspAPI,));
//webSocketServer.subscribeToHTTPForStealing(server);

// start the HTTP server
server.startServing();
Console.Read();
server.stopServing();
//server.startServing();
//Console.Read();
//server.stopServing();
}

static void server_ServerNotify(object sender, Servers.NotifyEventArgs e)
Expand Down

0 comments on commit 968ca67

Please sign in to comment.