Skip to content

Commit

Permalink
Merge pull request #46 from QuantConnect/bug-subscription-handling-wh…
Browse files Browse the repository at this point in the history
…ile-restarting

Correctly handle subscriptions while restarting
  • Loading branch information
Martin-Molinero authored Dec 26, 2022
2 parents 9d4cdb2 + 4f1d0e5 commit 65cd739
Showing 1 changed file with 40 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ public override void Connect()
var attempt = 1;
const int maxAttempts = 5;

var subscribedSymbolsCount = _subscribedSymbols.Skip(0).Count();
var subscribedSymbolsCount = _subscriptionManager.GetSubscribedSymbols().Count();
if (subscribedSymbolsCount > 0)
{
Log.Trace($"InteractiveBrokersBrokerage.Connect(): Data subscription count {subscribedSymbolsCount}, restoring data subscriptions is required");
Expand Down Expand Up @@ -867,17 +867,10 @@ private bool HeartBeat(int waitTimeMs)

if (!_ibAutomater.IsWithinScheduledServerResetTimes() && IsConnected
// do not run heart beat if we are close to daily restarts
&& DateTime.Now.TimeOfDay < _heartBeatTimeLimit)
&& DateTime.Now.TimeOfDay < _heartBeatTimeLimit
// do not run heart beat if we are restarting
&& !IsRestartInProgress())
{
// we take the lock to avoid it getting disposed while we are evaluating it
lock(_gatewayRestartTokenSource ?? new object())
{
if (_gatewayRestartTokenSource != null && !_gatewayRestartTokenSource.IsCancellationRequested)
{
// do not run heart beat if we are restarting
return true;
}
}
_currentTimeEvent.Reset();
// request current time to the server
_client.ClientSocket.reqCurrentTime();
Expand Down Expand Up @@ -1697,7 +1690,7 @@ private void RestoreDataSubscriptions()
List<Symbol> subscribedSymbols;
lock (_sync)
{
subscribedSymbols = _subscribedSymbols.Keys.ToList();
subscribedSymbols = _subscriptionManager.GetSubscribedSymbols().ToList();

_subscribedSymbols.Clear();
_subscribedTickers.Clear();
Expand Down Expand Up @@ -2901,6 +2894,11 @@ public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventH
/// <param name="symbols">The symbols to be added keyed by SecurityType</param>
private bool Subscribe(IEnumerable<Symbol> symbols)
{
if (!CanHandleSubscriptionRequest(symbols, "subscribe"))
{
return true;
}

try
{
foreach (var symbol in symbols)
Expand Down Expand Up @@ -2968,12 +2966,30 @@ public void Unsubscribe(SubscriptionDataConfig dataConfig)
_aggregator.Remove(dataConfig);
}

private bool CanHandleSubscriptionRequest(IEnumerable<Symbol> symbols, string message)
{
var result = !IsRestartInProgress() && IsConnected;
if (!result)
{
// skip while restarting and not connected, once restart has ended and we are connected
// we will restore data subscriptions asking the _subscriptionManager we want to avoid the race condition where
// we are subscribing mid restart and we send IB an invalid request Id
Log.Trace($"InteractiveBrokersBrokerage.CanHandleSubscription(): skip request for [{string.Join(",", symbols)}] {message}");
}
return result;
}

/// <summary>
/// Removes the specified symbols to the subscription
/// </summary>
/// <param name="symbols">The symbols to be removed keyed by SecurityType</param>
private bool Unsubscribe(IEnumerable<Symbol> symbols)
{
if (!CanHandleSubscriptionRequest(symbols, "unsubscribe"))
{
return true;
}

try
{
foreach (var symbol in symbols)
Expand Down Expand Up @@ -3833,21 +3849,31 @@ private void OnIbAutomaterOutputDataReceived(object sender, OutputDataReceivedEv

private void StopGatewayRestartTask()
{
if (_gatewayRestartTokenSource != null && !_gatewayRestartTokenSource.IsCancellationRequested)
if (IsRestartInProgress())
{
_gatewayRestartTokenSource.Cancel();
Log.Trace($"InteractiveBrokersBrokerage.StopGatewayRestartTask(): cancelled");
}
}

private bool IsRestartInProgress()
{
// we take the lock to avoid it getting disposed while we are evaluating it
lock (_gatewayRestartTokenSource ?? new object())
{
// check if we are restarting
return _gatewayRestartTokenSource != null && !_gatewayRestartTokenSource.IsCancellationRequested;
}
}

/// <summary>
/// Rarely the gateways goes into an invalid state until it's restarted, so we restart the gateway from within so 2FA is not requested
/// </summary>
private void StartGatewayRestartTask()
{
try
{
if (_isDisposeCalled || _gatewayRestartTokenSource != null && !_gatewayRestartTokenSource.IsCancellationRequested)
if (_isDisposeCalled || IsRestartInProgress())
{
// if we are disposed or we already triggered the restart skip a new call
var message = _isDisposeCalled ? "we are disposed" : "restart task already scheduled";
Expand Down

0 comments on commit 65cd739

Please sign in to comment.