Skip to content
This repository has been archived by the owner on Feb 20, 2022. It is now read-only.

Commit

Permalink
Run in iterations and perform temp file cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sandersaares committed Jan 28, 2020
1 parent 0b161ea commit 123af60
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 112 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ System requirements:
* TShark (the command-line variant of [Wireshark](https://www.wireshark.org/#download))
* On Ubuntu, the `tshark` package is sufficient.
* On Windows, install the full version of Wireshark.
* 2 GB of free space in the temporary files directory at runtime.

The `tshark` command must be available in a new terminal. You may need to [register the installation directory in the PATH environment variable](https://www.architectryan.com/2018/03/17/add-to-the-path-on-windows-10/).

Expand Down
2 changes: 1 addition & 1 deletion TzspPacketStreamExporter/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

[assembly: AssemblyCompany("prometheus-net")]
[assembly: AssemblyProduct("TZSP packet stream analysis exporter")]
[assembly: AssemblyVersion("1.1.0")]
[assembly: AssemblyVersion("1.1.1")]
6 changes: 6 additions & 0 deletions TzspPacketStreamExporter/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ static class Constants

public const ushort DefaultPublishPort = 9184;

/// <summary>
/// How many packets we process before we restart TShark.
/// This is necessary because we want to clean up the temporary files TShark generates.
/// </summary>
public const int PacketsPerIteration = 1_000_000;

public static string TsharkExecutableName
{
get
Expand Down
249 changes: 138 additions & 111 deletions TzspPacketStreamExporter/ExporterLogic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,148 +39,157 @@ public async Task RunAsync(CancellationToken cancel)
throw;
}

// We cancel processing if TShark exits or we get our own higher level CT signaled.
var cancelProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(cancel);
var stdoutFinished = new SemaphoreSlim(0, 1);
var stderrFinished = new SemaphoreSlim(0, 1);
_log.Info("Starting TZSP packet stream processing.");

void ConsumeStandardOutput(Stream stdout)
// TShark will exit after N packets have been processed, to enable us to cleanup temp files.
// We just run it in a loop until cancelled or until TShark fails.
while (!cancel.IsCancellationRequested)
{
// Text mode output, each line consisting of:
// 1. Hex string of packet bytes (starting with either outer UDP header or inner TZSP header)
// 2. A space character.
// 3. Type of the data ("eth:ethertype:ip:data" - UDP header, "eth:ethertype:ip:udp:data" - TZSP header)
// 4. A space character.
// 5. The destination UDP port of the TZSP protocol ("udp.dstport") but ONLY if type of data is TZSP header.
// If type of data is UDP header, we need to parse the port ourselves.
// Sometimes (not always) TShark cleans up on its own.
// Better safe than sorry, though!
DeleteTemporaryFiles();

try
{
var reader = new StreamReader(stdout, Encoding.UTF8, leaveOpen: true);
// We cancel processing if TShark exits or we get our own higher level CT signaled.
using var cancelProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(cancel);
var stdoutFinished = new SemaphoreSlim(0, 1);
var stderrFinished = new SemaphoreSlim(0, 1);

while (true)
void ConsumeStandardOutput(Stream stdout)
{
// Text mode output, each line consisting of:
// 1. Hex string of packet bytes (starting with either outer UDP header or inner TZSP header)
// 2. A space character.
// 3. Type of the data ("eth:ethertype:ip:data" - UDP header, "eth:ethertype:ip:udp:data" - TZSP header)
// 4. A space character.
// 5. The destination UDP port of the TZSP protocol ("udp.dstport") but ONLY if type of data is TZSP header.
// If type of data is UDP header, we need to parse the port ourselves.

try
{
var line = reader.ReadLineAsync()
.WithAbandonment(cancelProcessingCts.Token)
.WaitAndUnwrapExceptions();
var reader = new StreamReader(stdout, Encoding.UTF8, leaveOpen: true);

if (line == null)
break; // End of stream.
while (true)
{
var line = reader.ReadLineAsync()
.WithAbandonment(cancelProcessingCts.Token)
.WaitAndUnwrapExceptions();

string packetBytesHex;
string packetType;
if (line == null)
break; // End of stream.

var parts = line.Split(' ');
if (parts.Length != 3)
throw new NotSupportedException("Output line did not have expected number of components.");
string packetBytesHex;
string packetType;

// On some systems there are colons. On others there are not!
// Language/version differences? Whatever, get rid of them.
packetBytesHex = parts[0].Replace(":", "");
packetType = parts[1];
var parts = line.Split(' ');
if (parts.Length != 3)
throw new NotSupportedException("Output line did not have expected number of components.");

var packetBytes = Helpers.Convert.HexStringToByteArray(packetBytesHex);
// On some systems there are colons. On others there are not!
// Language/version differences? Whatever, get rid of them.
packetBytesHex = parts[0].Replace(":", "");
packetType = parts[1];

try
{
if (packetType == "eth:ethertype:ip:data")
{
ProcessTzspPacketWithUdpHeader(packetBytes);
}
else if (packetType == "eth:ethertype:ip:udp:data")
var packetBytes = Helpers.Convert.HexStringToByteArray(packetBytesHex);

try
{
var listenPort = ushort.Parse(parts[2]);
ProcessTzspPacket(packetBytes, listenPort);
if (packetType == "eth:ethertype:ip:data")
{
ProcessTzspPacketWithUdpHeader(packetBytes);
}
else if (packetType == "eth:ethertype:ip:udp:data")
{
var listenPort = ushort.Parse(parts[2]);
ProcessTzspPacket(packetBytes, listenPort);
}
else
{
throw new NotSupportedException("Unexpected packet type: " + packetType);
}
}
else
catch (Exception ex)
{
throw new NotSupportedException("Unexpected packet type: " + packetType);
_log.Error("Ignoring unsupported packet: " + Helpers.Debug.GetAllExceptionMessages(ex));
}
}
catch (Exception ex)
{
_log.Error("Ignoring unsupported packet: " + Helpers.Debug.GetAllExceptionMessages(ex));
}
}
}
catch (OperationCanceledException)
{
// It's OK, we were cancelled because processing is finished.
}
catch (Exception ex)
{
// If we get here, something is fatally wrong with parsing logic or TShark output.
_log.Error(Helpers.Debug.GetAllExceptionMessages(ex));

// This should not happen, so stop everything. Gracefully, so we flush logs.
Environment.ExitCode = -1;
Program.MasterCancellation.Cancel();
}
finally
{
stdoutFinished.Release();
}
};
catch (OperationCanceledException)
{
// It's OK, we were cancelled because processing is finished.
}
catch (Exception ex)
{
// If we get here, something is fatally wrong with parsing logic or TShark output.
_log.Error(Helpers.Debug.GetAllExceptionMessages(ex));

void ConsumeStandardError(Stream stderr)
{
// Only errors should show up here. We will simply log them for now
// - only if tshark exits do we consider it a fatal error.
// This should not happen, so stop everything. Gracefully, so we flush logs.
Environment.ExitCode = -1;
Program.MasterCancellation.Cancel();
}
finally
{
stdoutFinished.Release();
}
};

try
void ConsumeStandardError(Stream stderr)
{
var reader = new StreamReader(stderr, Encoding.UTF8, leaveOpen: true);
// Only errors should show up here. We will simply log them for now
// - only if tshark exits do we consider it a fatal error.

while (true)
try
{
var line = reader.ReadLineAsync()
.WithAbandonment(cancelProcessingCts.Token)
.WaitAndUnwrapExceptions();
var reader = new StreamReader(stderr, Encoding.UTF8, leaveOpen: true);

while (true)
{
var line = reader.ReadLineAsync()
.WithAbandonment(cancelProcessingCts.Token)
.WaitAndUnwrapExceptions();

if (line == null)
break; // End of stream.
if (line == null)
break; // End of stream.

_log.Error(line);
_log.Error(line);
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException)
{
// It's OK, we were cancelled because processing is finished.
}
finally
{
stderrFinished.Release();
}
};

var tsharkCommand = new ExternalTool
{
// It's OK, we were cancelled because processing is finished.
}
finally
ExecutablePath = Constants.TsharkExecutableName,
ResultHeuristics = ExternalToolResultHeuristics.Linux,
Arguments = @$"-i ""{ListenInterface}"" -f ""{MakeTsharkFilterString()}"" -p -T fields -e data.data -e frame.protocols -e udp.dstport -Eseparator=/s -Q -c {Constants.PacketsPerIteration}",
StandardOutputConsumer = ConsumeStandardOutput,
StandardErrorConsumer = ConsumeStandardError
};

var tshark = tsharkCommand.Start();
var result = await tshark.GetResultAsync(cancel);
cancelProcessingCts.Cancel();

// Wait for output processing threads to finish, so error messages are printed to logs before we exit.
_log.Debug("TShark finished iteration. Waiting for data processing threads to clean up and flush logs.");
await stderrFinished.WaitAsync();
await stdoutFinished.WaitAsync();

if (!cancel.IsCancellationRequested && !result.Succeeded)
{
stderrFinished.Release();
_log.Error("TShark exited with an error result. Review logs above to understand the details of the failure.");
Environment.ExitCode = -1;
break;
}
};

var tsharkCommand = new ExternalTool
{
ExecutablePath = Constants.TsharkExecutableName,
ResultHeuristics = ExternalToolResultHeuristics.Linux,
Arguments = @$"-i ""{ListenInterface}"" -f ""{MakeTsharkFilterString()}"" -p -T fields -e data.data -e frame.protocols -e udp.dstport -Eseparator=/s -Q",
StandardOutputConsumer = ConsumeStandardOutput,
StandardErrorConsumer = ConsumeStandardError
};

var tshark = tsharkCommand.Start();

_log.Info("Starting TZSP packet stream processing.");

var result = await tshark.GetResultAsync(cancel);
cancelProcessingCts.Cancel();

if (!cancel.IsCancellationRequested && !result.Succeeded)
{
_log.Error("TShark exited with an error result. Review logs above to understand the details of the failure.");
Environment.ExitCode = -1;
}

await _metricServer.StopAsync();

// Wait for output processing threads to finish, so error messages are printed to logs before we exit.
_log.Debug("Waiting for data processing threads to clean up and flush logs.");
await stderrFinished.WaitAsync();
await stdoutFinished.WaitAsync();
}

private static async Task VerifyTshark(CancellationToken cancel)
Expand All @@ -202,6 +211,24 @@ private static async Task VerifyTshark(CancellationToken cancel)
throw new NotSupportedException("Unrecognized TShark version/build.");
}

private static void DeleteTemporaryFiles()
{
var files = Directory.GetFiles(Path.GetTempPath(), "wireshark_*.pcapng");

foreach (var file in files)
{
try
{
File.Delete(file);
_log.Debug($"Deleted temporary file: {file}");
}
catch
{
// It's fine - maybe it is in use by a parallel TShark instance!
}
}
}

private static readonly IPNetwork MulticastNetwork = IPNetwork.Parse("224.0.0.0/4");
private static readonly IPNetwork[] PrivateUseNetworks = new[]
{
Expand Down

0 comments on commit 123af60

Please sign in to comment.