Skip to content

Commit

Permalink
[EN-6646] Add LatencyTest tool (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
stdcion authored Jun 8, 2023
1 parent 358d4e8 commit dff29c4
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ API.
We expect the new repository to go into production in Q2’2023.
At the same time, the old version will be considered deprecated, and at the end of 2024, we plan to end the service.
If you’re already our customer and have difficulty with a future transition, please contact us via
our [customer portal](https://jira.in.devexperts.com/servicedesk/customer/portal/1/create/122).
our [customer portal](https://jira.in.devexperts.com/servicedesk/customer/portal/1).

### Future Development

Expand Down
8 changes: 7 additions & 1 deletion src/DxFeed.Graal.Net.Tools/Attributes/ToolInfoAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace DxFeed.Graal.Net.Tools.Attributes;

public class ToolInfoAttribute : VerbAttribute
{
private string? _description;

public ToolInfoAttribute(string name)
: base(name)
{
Expand All @@ -21,7 +23,11 @@ public string ShortDescription
set => HelpText = value;
}

public string? Description { get; set; }
public string? Description
{
get => _description ?? ShortDescription;
set => _description = value;
}

public string[]? Usage { get; set; }

Expand Down
30 changes: 30 additions & 0 deletions src/DxFeed.Graal.Net.Tools/LatencyTest/LatencyTestArg.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// <copyright file="LatencyTestArg.cs" company="Devexperts LLC">
// Copyright © 2022 Devexperts LLC. All rights reserved.
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
// </copyright>

using System;
using CommandLine;
using CommandLine.Text;
using DxFeed.Graal.Net.Tools.Arguments;

namespace DxFeed.Graal.Net.Tools.LatencyTest;

public class LatencyTestArgs : IAddressArg, ITypesArg, ISymbolsArg, IPropertyArg
{
public string Address { get; set; } = null!;

public string? Types { get; set; } = null!;

public string? Symbols { get; set; } = null!;

public string? Properties { get; set; } = null!;

[Option("force-stream", Required = false,
HelpText = "Enforces a streaming contract for subscription. The StreamFeed role is used instead of Feed.")]
public bool ForceStream { get; set; } = false;

[Option("interval", Required = false, HelpText = "Measurement interval in seconds.")]
public int Interval { get; set; } = 2;
}
256 changes: 256 additions & 0 deletions src/DxFeed.Graal.Net.Tools/LatencyTest/LatencyTestTool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// <copyright file="LatencyTestTool.cs" company="Devexperts LLC">
// Copyright © 2022 Devexperts LLC. All rights reserved.
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
// </copyright>

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
using DxFeed.Graal.Net.Api;
using DxFeed.Graal.Net.Events;
using DxFeed.Graal.Net.Events.Market;
using DxFeed.Graal.Net.Tools.Attributes;
using DxFeed.Graal.Net.Utils;

namespace DxFeed.Graal.Net.Tools.LatencyTest;

// ReSharper disable NonAtomicCompoundOperator
// ReSharper disable AccessToDisposedClosure

[ToolInfo(
"LatencyTest",
ShortDescription = "Connects to the specified address(es) and calculates latency.",
Usage = new[] { "LatencyTest <address> <types> <symbols> [<options>]" })]
public class LatencyTestTool : AbstractTool<LatencyTestArgs>
{
private sealed class Diagnostic : IDisposable
{
private static readonly string DiagnosticHeader = PlatformUtils.PlatformDiagInfo;
private static readonly NumberFormatInfo SpaceNumFormat = new() { NumberGroupSeparator = " " };

private readonly Timer _timer;

private readonly Stopwatch _timerDiff = new();
private readonly Stopwatch _runningDiff = new();
private long _eventCounter;
private long _listenerCounter;

private double _min = double.NaN;
private double _mean = double.NaN;
private double _max = double.NaN;
private double _percentile = double.NaN;
private double _stdDev = double.NaN;
private double _stdErr = double.NaN;
private long _sampleSize;

private readonly ConcurrentSet<string?> _symbols = new();
private readonly ConcurrentBag<long> _deltaTime = new();

private readonly TimeSpan _measurementPeriod;

public Diagnostic(TimeSpan measurementPeriod)
{
_timerDiff.Restart();
_runningDiff.Restart();
_measurementPeriod = measurementPeriod;
_timer = new Timer(TimerCallback, null, measurementPeriod, measurementPeriod);
}

public void AddEventCounter(long value) =>
Interlocked.Add(ref _eventCounter, value);

public void AddListenerCounter(long value) =>
Interlocked.Add(ref _listenerCounter, value);

public void HandleEvents(IEnumerable<IEventType> value)
{
var time = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
long validEvent = 0;
foreach (var e in value)
{
long deltaTime;
switch (e)
{
case Quote quote:
deltaTime = time - quote.Time;
++validEvent;
_deltaTime.Add(deltaTime);
_symbols.Add(e.EventSymbol);
break;
case Trade trade:
deltaTime = time - trade.Time;
++validEvent;
_deltaTime.Add(deltaTime);
_symbols.Add(e.EventSymbol);
break;
case TradeETH tradeETH:
deltaTime = time - tradeETH.Time;
++validEvent;
_deltaTime.Add(deltaTime);
_symbols.Add(e.EventSymbol);
break;
case TimeAndSale timeAndSale:
if (!timeAndSale.IsNew || !timeAndSale.IsValidTick)
{
continue;
}

deltaTime = time - timeAndSale.Time;
++validEvent;
_deltaTime.Add(deltaTime);
_symbols.Add(e.EventSymbol);
break;
}
}

AddEventCounter(validEvent);
}

public void Dispose() =>
_timer.Dispose();

private double GetEventsPerSec() =>
GetAndResetEventCounter() / _timerDiff.Elapsed.TotalSeconds;

private long GetAndResetEventCounter() =>
Interlocked.Exchange(ref _eventCounter, 0);

private static string FormatDouble(double value) =>
double.IsNaN(value) ? "---" : value.ToString("N2", SpaceNumFormat);

private void TimerCallback(object? _)
{
var eventsPerSec = GetEventsPerSec();

if (!_deltaTime.IsEmpty)
{
var deltas = _deltaTime.ToList();
_deltaTime.Clear();
_min = CalcMin(deltas);
_mean = CalcMean(deltas);
_max = CalcMax(deltas);
_percentile = CalcPercentile(deltas.ToArray(), 0.99);
_stdDev = CalcStdDev(deltas);
_stdErr = CalcStdErr(deltas, _stdDev);
_sampleSize = deltas.Count;
}

var uniqueSymbols = _symbols.Count;
_symbols.Clear();

Console.WriteLine();
Console.WriteLine(DiagnosticHeader);
Console.WriteLine(@"----------------------------------------------");
Console.WriteLine(@$" Rate of events (avg) : {FormatDouble(eventsPerSec)} (events/s)");
Console.WriteLine(@$" Rate of unique symbols : {uniqueSymbols} (symbols/interval)");
Console.WriteLine(@$" Min : {FormatDouble(_min)} (ms)");
Console.WriteLine(@$" Max : {FormatDouble(_max)} (ms)");
Console.WriteLine(@$" 99th percentile : {FormatDouble(_percentile)} (ms)");
Console.WriteLine(@$" Mean : {FormatDouble(_mean)} (ms)");
Console.WriteLine(@$" StdDev : {FormatDouble(_stdDev)} (ms)");
Console.WriteLine(@$" Error : {FormatDouble(_stdErr)} (ms)");
Console.WriteLine(@$" Sample size (N) : {_sampleSize} (events)");
Console.WriteLine(@$" Measurement interval : {_measurementPeriod.Seconds} (s)");
Console.WriteLine(@$" Running time : {_runningDiff.Elapsed}");

_min = double.NaN;
_mean = double.NaN;
_max = double.NaN;
_percentile = double.NaN;
_stdDev = double.NaN;
_stdErr = double.NaN;
_sampleSize = 0;

_timerDiff.Restart();
}

private static double CalcPercentile(long[] sequence, double excelPercentile)
{
Array.Sort(sequence);
var N = sequence.Length;
var n = ((N - 1) * excelPercentile) + 1;
if (n.Equals(1d))
{
return sequence[0];
}

if (n.Equals(N))
{
return sequence[N - 1];
}

var k = (int)n;
var d = n - k;
return sequence[k - 1] + (d * (sequence[k] - sequence[k - 1]));
}

private static double CalcMin(List<long> values) =>
values.Min();

private static double CalcMean(List<long> values) =>
values.Average();

private static double CalcMax(List<long> values) =>
values.Max();

private static double CalcStdDev(List<long> values)
{
double stdDev = 0;
var count = values.Count;
if (count <= 1)
{
return stdDev;
}

count -= 1;

var avg = values.Average();
var sum = values.Sum(d => (d - avg) * (d - avg));
stdDev = Math.Sqrt(sum / count);
return stdDev;
}

private static double CalcStdErr(IEnumerable<long> values, double stdDev)
{
var count = values.Count();
return stdDev / Math.Sqrt(count);
}
}

public override void Run(LatencyTestArgs args)
{
using var endpoint = DXEndpoint
.NewBuilder()
.WithRole(args.ForceStream ? DXEndpoint.Role.StreamFeed : DXEndpoint.Role.Feed)
.WithProperty(DXEndpoint.DXFeedWildcardEnableProperty, "true") // Enabled by default.
.WithProperties(ParseProperties(args.Properties))
.WithName(nameof(LatencyTestTool))
.Build();

using var sub = endpoint
.GetFeed()
.CreateSubscription(ParseEventTypes(args.Types!));

var measurementPeriod = new TimeSpan(0, 0, args.Interval);
using var diagnostic = new Diagnostic(measurementPeriod);


sub.AddEventListener(events =>
{
diagnostic.AddListenerCounter(1);
diagnostic.HandleEvents(events);
});

sub.AddSymbols(ParseSymbols(args.Symbols!).ToList());

endpoint.Connect(args.Address);

endpoint.AwaitNotConnected();
endpoint.CloseAndAwaitTermination();
}
}
16 changes: 8 additions & 8 deletions src/DxFeed.Graal.Net.Tools/PerfTest/PerfTestTool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ private void TimerCallback(object? _)
Console.WriteLine();
Console.WriteLine(DiagnosticHeader);
Console.WriteLine(@"----------------------------------------------");
Console.WriteLine(@$" Events : {FormatDouble(eventsPerSec)} (per/sec)");
Console.WriteLine(@$" Listener Calls : {FormatDouble(listenerCallsPerSec)} (per/sec)");
Console.WriteLine(@$" Average Number of Events : {FormatDouble(eventsPerSec / listenerCallsPerSec)}");
Console.WriteLine(@$" Current Memory Usage : {currentMemoryUsage} (Mbyte)");
Console.WriteLine(@$" Peak Memory Usage : {_peakMemoryUsage} (Mbyte)");
Console.WriteLine(@$" Current CPU Usage : {currentCpuUsage:P2}");
Console.WriteLine(@$" Peak CPU Usage : {_peakCpuUsage:P2}");
Console.WriteLine(@$" Running Time : {_runningDiff.Elapsed}");
Console.WriteLine(@$" Rate of events (avg) : {FormatDouble(eventsPerSec)} (events/s)");
Console.WriteLine(@$" Rate of listener calls : {FormatDouble(listenerCallsPerSec)} (calls/s)");
Console.WriteLine(@$" Number of events in call (avg) : {FormatDouble(eventsPerSec / listenerCallsPerSec)} (events)");
Console.WriteLine(@$" Current memory usage : {currentMemoryUsage} (Mbyte)");
Console.WriteLine(@$" Peak memory usage : {_peakMemoryUsage} (Mbyte)");
Console.WriteLine(@$" Current CPU usage : {currentCpuUsage:P2}");
Console.WriteLine(@$" Peak CPU usage : {_peakCpuUsage:P2}");
Console.WriteLine(@$" Running time : {_runningDiff.Elapsed}");

_timerDiff.Restart();
}
Expand Down
2 changes: 1 addition & 1 deletion src/DxFeed.Graal.Net/Utils/ConcurrentSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace DxFeed.Graal.Net.Utils;
/// for more consistent.
/// </summary>
/// <typeparam name="T">The type of elements in the set.</typeparam>
internal class ConcurrentSet<T> : ICollection<T>
public class ConcurrentSet<T> : ICollection<T>
where T : notnull
{
private readonly ConcurrentDictionary<T, int> _inner;
Expand Down

0 comments on commit dff29c4

Please sign in to comment.