diff --git a/README.md b/README.md index 6d20068..38c4bce 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ System requirements: * TShark (the command-line variant of [Wireshark](https://www.wireshark.org/#download)) * On Ubuntu, the `tshark` package is sufficient. * On Windows, install the full version of Wireshark. +* 2 GB of free space in the temporary files directory at runtime. The `tshark` command must be available in a new terminal. You may need to [register the installation directory in the PATH environment variable](https://www.architectryan.com/2018/03/17/add-to-the-path-on-windows-10/). diff --git a/TzspPacketStreamExporter/AssemblyInfo.cs b/TzspPacketStreamExporter/AssemblyInfo.cs index e1dbf93..85fa7cd 100644 --- a/TzspPacketStreamExporter/AssemblyInfo.cs +++ b/TzspPacketStreamExporter/AssemblyInfo.cs @@ -2,4 +2,4 @@ [assembly: AssemblyCompany("prometheus-net")] [assembly: AssemblyProduct("TZSP packet stream analysis exporter")] -[assembly: AssemblyVersion("1.1.0")] \ No newline at end of file +[assembly: AssemblyVersion("1.1.1")] \ No newline at end of file diff --git a/TzspPacketStreamExporter/Constants.cs b/TzspPacketStreamExporter/Constants.cs index 36bc312..7cbacad 100644 --- a/TzspPacketStreamExporter/Constants.cs +++ b/TzspPacketStreamExporter/Constants.cs @@ -9,6 +9,12 @@ static class Constants public const ushort DefaultPublishPort = 9184; + /// + /// How many packets we process before we restart TShark. + /// This is necessary because we want to clean up the temporary files TShark generates. + /// + public const int PacketsPerIteration = 1_000_000; + public static string TsharkExecutableName { get diff --git a/TzspPacketStreamExporter/ExporterLogic.cs b/TzspPacketStreamExporter/ExporterLogic.cs index 741fcfa..1888a5b 100644 --- a/TzspPacketStreamExporter/ExporterLogic.cs +++ b/TzspPacketStreamExporter/ExporterLogic.cs @@ -39,148 +39,157 @@ public async Task RunAsync(CancellationToken cancel) throw; } - // We cancel processing if TShark exits or we get our own higher level CT signaled. - var cancelProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(cancel); - var stdoutFinished = new SemaphoreSlim(0, 1); - var stderrFinished = new SemaphoreSlim(0, 1); + _log.Info("Starting TZSP packet stream processing."); - void ConsumeStandardOutput(Stream stdout) + // TShark will exit after N packets have been processed, to enable us to cleanup temp files. + // We just run it in a loop until cancelled or until TShark fails. + while (!cancel.IsCancellationRequested) { - // Text mode output, each line consisting of: - // 1. Hex string of packet bytes (starting with either outer UDP header or inner TZSP header) - // 2. A space character. - // 3. Type of the data ("eth:ethertype:ip:data" - UDP header, "eth:ethertype:ip:udp:data" - TZSP header) - // 4. A space character. - // 5. The destination UDP port of the TZSP protocol ("udp.dstport") but ONLY if type of data is TZSP header. - // If type of data is UDP header, we need to parse the port ourselves. + // Sometimes (not always) TShark cleans up on its own. + // Better safe than sorry, though! + DeleteTemporaryFiles(); - try - { - var reader = new StreamReader(stdout, Encoding.UTF8, leaveOpen: true); + // We cancel processing if TShark exits or we get our own higher level CT signaled. + using var cancelProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(cancel); + var stdoutFinished = new SemaphoreSlim(0, 1); + var stderrFinished = new SemaphoreSlim(0, 1); - while (true) + void ConsumeStandardOutput(Stream stdout) + { + // Text mode output, each line consisting of: + // 1. Hex string of packet bytes (starting with either outer UDP header or inner TZSP header) + // 2. A space character. + // 3. Type of the data ("eth:ethertype:ip:data" - UDP header, "eth:ethertype:ip:udp:data" - TZSP header) + // 4. A space character. + // 5. The destination UDP port of the TZSP protocol ("udp.dstport") but ONLY if type of data is TZSP header. + // If type of data is UDP header, we need to parse the port ourselves. + + try { - var line = reader.ReadLineAsync() - .WithAbandonment(cancelProcessingCts.Token) - .WaitAndUnwrapExceptions(); + var reader = new StreamReader(stdout, Encoding.UTF8, leaveOpen: true); - if (line == null) - break; // End of stream. + while (true) + { + var line = reader.ReadLineAsync() + .WithAbandonment(cancelProcessingCts.Token) + .WaitAndUnwrapExceptions(); - string packetBytesHex; - string packetType; + if (line == null) + break; // End of stream. - var parts = line.Split(' '); - if (parts.Length != 3) - throw new NotSupportedException("Output line did not have expected number of components."); + string packetBytesHex; + string packetType; - // On some systems there are colons. On others there are not! - // Language/version differences? Whatever, get rid of them. - packetBytesHex = parts[0].Replace(":", ""); - packetType = parts[1]; + var parts = line.Split(' '); + if (parts.Length != 3) + throw new NotSupportedException("Output line did not have expected number of components."); - var packetBytes = Helpers.Convert.HexStringToByteArray(packetBytesHex); + // On some systems there are colons. On others there are not! + // Language/version differences? Whatever, get rid of them. + packetBytesHex = parts[0].Replace(":", ""); + packetType = parts[1]; - try - { - if (packetType == "eth:ethertype:ip:data") - { - ProcessTzspPacketWithUdpHeader(packetBytes); - } - else if (packetType == "eth:ethertype:ip:udp:data") + var packetBytes = Helpers.Convert.HexStringToByteArray(packetBytesHex); + + try { - var listenPort = ushort.Parse(parts[2]); - ProcessTzspPacket(packetBytes, listenPort); + if (packetType == "eth:ethertype:ip:data") + { + ProcessTzspPacketWithUdpHeader(packetBytes); + } + else if (packetType == "eth:ethertype:ip:udp:data") + { + var listenPort = ushort.Parse(parts[2]); + ProcessTzspPacket(packetBytes, listenPort); + } + else + { + throw new NotSupportedException("Unexpected packet type: " + packetType); + } } - else + catch (Exception ex) { - throw new NotSupportedException("Unexpected packet type: " + packetType); + _log.Error("Ignoring unsupported packet: " + Helpers.Debug.GetAllExceptionMessages(ex)); } } - catch (Exception ex) - { - _log.Error("Ignoring unsupported packet: " + Helpers.Debug.GetAllExceptionMessages(ex)); - } } - } - catch (OperationCanceledException) - { - // It's OK, we were cancelled because processing is finished. - } - catch (Exception ex) - { - // If we get here, something is fatally wrong with parsing logic or TShark output. - _log.Error(Helpers.Debug.GetAllExceptionMessages(ex)); - - // This should not happen, so stop everything. Gracefully, so we flush logs. - Environment.ExitCode = -1; - Program.MasterCancellation.Cancel(); - } - finally - { - stdoutFinished.Release(); - } - }; + catch (OperationCanceledException) + { + // It's OK, we were cancelled because processing is finished. + } + catch (Exception ex) + { + // If we get here, something is fatally wrong with parsing logic or TShark output. + _log.Error(Helpers.Debug.GetAllExceptionMessages(ex)); - void ConsumeStandardError(Stream stderr) - { - // Only errors should show up here. We will simply log them for now - // - only if tshark exits do we consider it a fatal error. + // This should not happen, so stop everything. Gracefully, so we flush logs. + Environment.ExitCode = -1; + Program.MasterCancellation.Cancel(); + } + finally + { + stdoutFinished.Release(); + } + }; - try + void ConsumeStandardError(Stream stderr) { - var reader = new StreamReader(stderr, Encoding.UTF8, leaveOpen: true); + // Only errors should show up here. We will simply log them for now + // - only if tshark exits do we consider it a fatal error. - while (true) + try { - var line = reader.ReadLineAsync() - .WithAbandonment(cancelProcessingCts.Token) - .WaitAndUnwrapExceptions(); + var reader = new StreamReader(stderr, Encoding.UTF8, leaveOpen: true); + + while (true) + { + var line = reader.ReadLineAsync() + .WithAbandonment(cancelProcessingCts.Token) + .WaitAndUnwrapExceptions(); - if (line == null) - break; // End of stream. + if (line == null) + break; // End of stream. - _log.Error(line); + _log.Error(line); + } } - } - catch (OperationCanceledException) + catch (OperationCanceledException) + { + // It's OK, we were cancelled because processing is finished. + } + finally + { + stderrFinished.Release(); + } + }; + + var tsharkCommand = new ExternalTool { - // It's OK, we were cancelled because processing is finished. - } - finally + ExecutablePath = Constants.TsharkExecutableName, + ResultHeuristics = ExternalToolResultHeuristics.Linux, + Arguments = @$"-i ""{ListenInterface}"" -f ""{MakeTsharkFilterString()}"" -p -T fields -e data.data -e frame.protocols -e udp.dstport -Eseparator=/s -Q -c {Constants.PacketsPerIteration}", + StandardOutputConsumer = ConsumeStandardOutput, + StandardErrorConsumer = ConsumeStandardError + }; + + var tshark = tsharkCommand.Start(); + var result = await tshark.GetResultAsync(cancel); + cancelProcessingCts.Cancel(); + + // Wait for output processing threads to finish, so error messages are printed to logs before we exit. + _log.Debug("TShark finished iteration. Waiting for data processing threads to clean up and flush logs."); + await stderrFinished.WaitAsync(); + await stdoutFinished.WaitAsync(); + + if (!cancel.IsCancellationRequested && !result.Succeeded) { - stderrFinished.Release(); + _log.Error("TShark exited with an error result. Review logs above to understand the details of the failure."); + Environment.ExitCode = -1; + break; } - }; - - var tsharkCommand = new ExternalTool - { - ExecutablePath = Constants.TsharkExecutableName, - ResultHeuristics = ExternalToolResultHeuristics.Linux, - Arguments = @$"-i ""{ListenInterface}"" -f ""{MakeTsharkFilterString()}"" -p -T fields -e data.data -e frame.protocols -e udp.dstport -Eseparator=/s -Q", - StandardOutputConsumer = ConsumeStandardOutput, - StandardErrorConsumer = ConsumeStandardError - }; - - var tshark = tsharkCommand.Start(); - - _log.Info("Starting TZSP packet stream processing."); - - var result = await tshark.GetResultAsync(cancel); - cancelProcessingCts.Cancel(); - - if (!cancel.IsCancellationRequested && !result.Succeeded) - { - _log.Error("TShark exited with an error result. Review logs above to understand the details of the failure."); - Environment.ExitCode = -1; } await _metricServer.StopAsync(); - - // Wait for output processing threads to finish, so error messages are printed to logs before we exit. - _log.Debug("Waiting for data processing threads to clean up and flush logs."); - await stderrFinished.WaitAsync(); - await stdoutFinished.WaitAsync(); } private static async Task VerifyTshark(CancellationToken cancel) @@ -202,6 +211,24 @@ private static async Task VerifyTshark(CancellationToken cancel) throw new NotSupportedException("Unrecognized TShark version/build."); } + private static void DeleteTemporaryFiles() + { + var files = Directory.GetFiles(Path.GetTempPath(), "wireshark_*.pcapng"); + + foreach (var file in files) + { + try + { + File.Delete(file); + _log.Debug($"Deleted temporary file: {file}"); + } + catch + { + // It's fine - maybe it is in use by a parallel TShark instance! + } + } + } + private static readonly IPNetwork MulticastNetwork = IPNetwork.Parse("224.0.0.0/4"); private static readonly IPNetwork[] PrivateUseNetworks = new[] {