diff --git a/src/MarginTrading.AzureRepositories/MarginTrading.AzureRepositories.csproj b/src/MarginTrading.AzureRepositories/MarginTrading.AzureRepositories.csproj index 101699c62..c4a2cade1 100644 --- a/src/MarginTrading.AzureRepositories/MarginTrading.AzureRepositories.csproj +++ b/src/MarginTrading.AzureRepositories/MarginTrading.AzureRepositories.csproj @@ -6,7 +6,7 @@ false false false - 2.31.0 + 2.29.4 8.0 diff --git a/src/MarginTrading.Backend.Contracts/MarginTrading.Backend.Contracts.csproj b/src/MarginTrading.Backend.Contracts/MarginTrading.Backend.Contracts.csproj index ea284313e..955c53df6 100644 --- a/src/MarginTrading.Backend.Contracts/MarginTrading.Backend.Contracts.csproj +++ b/src/MarginTrading.Backend.Contracts/MarginTrading.Backend.Contracts.csproj @@ -1,7 +1,7 @@  net6.0 - 2.31.0 + 2.29.4 Lykke.MarginTrading.BackendSnow.Contracts 8.0 true diff --git a/src/MarginTrading.Backend.Core.Mappers/MarginTrading.Backend.Core.Mappers.csproj b/src/MarginTrading.Backend.Core.Mappers/MarginTrading.Backend.Core.Mappers.csproj index 1e1b6aa65..f78632bd9 100644 --- a/src/MarginTrading.Backend.Core.Mappers/MarginTrading.Backend.Core.Mappers.csproj +++ b/src/MarginTrading.Backend.Core.Mappers/MarginTrading.Backend.Core.Mappers.csproj @@ -1,7 +1,7 @@  net6.0 - 2.31.0 + 2.29.4 8.0 diff --git a/src/MarginTrading.Backend.Core/Extensions/SagaExtensions.cs b/src/MarginTrading.Backend.Core/Extensions/SagaExtensions.cs index cdf4e6fdc..67bb35af0 100644 --- a/src/MarginTrading.Backend.Core/Extensions/SagaExtensions.cs +++ b/src/MarginTrading.Backend.Core/Extensions/SagaExtensions.cs @@ -28,7 +28,7 @@ public static bool SwitchState(this OperationDataBase data, TSta if (Convert.ToInt32(data.State) > Convert.ToInt32(expectedState)) { - LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchState), + LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchToState), $"Operation is already in the next state, so this event is ignored, {new {data, expectedState, nextState}.ToJson()}."); return false; } @@ -38,6 +38,14 @@ public static bool SwitchState(this OperationDataBase data, TSta return true; } + public static bool SwitchToState(this OperationDataBase data, + SpecialLiquidationOperationState nextState) => + data.SwitchState(data.State, nextState); + + public static bool SwitchToState(this IOperationExecutionInfo info, + SpecialLiquidationOperationState nextState) => + info.Data.SwitchToState(nextState); + public static bool SwitchState(this OperationDataBase data, SpecialLiquidationOperationState expectedState, SpecialLiquidationOperationState nextState) { @@ -55,7 +63,7 @@ public static bool SwitchState(this OperationDataBase Convert.ToInt32(expectedState)) { - LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchState), + LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchToState), $"Operation is already in the next state, so this event is ignored, {new {data, expectedState, nextState}.ToJson()}."); return false; } @@ -63,7 +71,7 @@ public static bool SwitchState(this OperationDataBasefalse false false - 2.31.0 + 2.29.4 8.0 diff --git a/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs b/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs index 500154f82..326ddaa61 100644 --- a/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs +++ b/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs @@ -10,6 +10,6 @@ namespace MarginTrading.Backend.Core.Repositories { public interface IOrdersHistoryRepository { - Task> GetLastSnapshot(DateTime @from); + Task> GetLastSnapshot(DateTime @from, DateTime? @to = null); } } diff --git a/src/MarginTrading.Backend.Services/Extensions/TradingEngineSnapshotExtensions.cs b/src/MarginTrading.Backend.Services/Extensions/TradingEngineSnapshotExtensions.cs index 87bd3b222..879399dd6 100644 --- a/src/MarginTrading.Backend.Services/Extensions/TradingEngineSnapshotExtensions.cs +++ b/src/MarginTrading.Backend.Services/Extensions/TradingEngineSnapshotExtensions.cs @@ -88,6 +88,9 @@ public static bool Initialized(this IDraftSnapshotKeeper keeper) public static bool IsPlatformClosureEvent(this MarketStateChangedEvent evt) => evt.Id == LykkeConstants.PlatformMarketIdentifier && !evt.IsEnabled; + public static bool IsNotPlatformClosureEvent(this MarketStateChangedEvent evt) => + !evt.IsPlatformClosureEvent(); + private static List GetOrders(this TradingEngineSnapshot snapshot) { return string.IsNullOrWhiteSpace(snapshot.OrdersJson) diff --git a/src/MarginTrading.Backend.Services/Helpers/LiquidationHelper.cs b/src/MarginTrading.Backend.Services/Helpers/LiquidationHelper.cs index c90be0a68..81138a729 100644 --- a/src/MarginTrading.Backend.Services/Helpers/LiquidationHelper.cs +++ b/src/MarginTrading.Backend.Services/Helpers/LiquidationHelper.cs @@ -4,15 +4,23 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Common; +using Lykke.Cqrs; +using MarginTrading.Backend.Contracts.Workflow.SpecialLiquidation.Commands; using MarginTrading.Backend.Core; using MarginTrading.Backend.Core.Exceptions; +using MarginTrading.Backend.Core.Extensions; using MarginTrading.Backend.Core.MatchingEngines; using MarginTrading.Backend.Core.Orders; +using MarginTrading.Backend.Core.Repositories; +using MarginTrading.Backend.Core.Settings; using MarginTrading.Backend.Core.Trading; using MarginTrading.Backend.Services.AssetPairs; using MarginTrading.Backend.Services.Infrastructure; +using MarginTrading.Backend.Services.Services; using MarginTrading.Backend.Services.Workflow.Liquidation.Commands; +using MarginTrading.Backend.Services.Workflow.SpecialLiquidation.Commands; using MarginTrading.Common.Services; using MoreLinq; @@ -25,19 +33,34 @@ public class LiquidationHelper private readonly ICqrsSender _cqrsSender; private readonly OrdersCache _ordersCache; private readonly IAssetPairDayOffService _assetPairDayOffService; + private readonly IAssetPairsCache _assetPairsCache; + private readonly CqrsContextNamesSettings _cqrsContextNamesSettings; + private readonly MarginTradingSettings _marginTradingSettings; + private readonly IOperationExecutionInfoRepository _operationExecutionInfoRepository; + private readonly IRfqService _specialLiquidationService; public LiquidationHelper(IMatchingEngineRouter matchingEngineRouter, IDateService dateService, ICqrsSender cqrsSender, OrdersCache ordersCache, - IAssetPairDayOffService assetPairDayOffService) + IAssetPairDayOffService assetPairDayOffService, + IAssetPairsCache assetPairsCache, + CqrsContextNamesSettings cqrsContextNamesSettings, + IOperationExecutionInfoRepository operationExecutionInfoRepository, + IRfqService specialLiquidationService, + MarginTradingSettings marginTradingSettings) { _matchingEngineRouter = matchingEngineRouter; _dateService = dateService; _cqrsSender = cqrsSender; _ordersCache = ordersCache; _assetPairDayOffService = assetPairDayOffService; + _assetPairsCache = assetPairsCache; + _cqrsContextNamesSettings = cqrsContextNamesSettings; + _operationExecutionInfoRepository = operationExecutionInfoRepository; + _specialLiquidationService = specialLiquidationService; + _marginTradingSettings = marginTradingSettings; } public bool CheckIfNetVolumeCanBeLiquidated(string assetPairId, Position[] positions, @@ -138,6 +161,91 @@ public bool CheckIfNetVolumeCanBeLiquidated(string assetPairId, Position[] posit return result; } + + public async Task FailIfInstrumentDiscontinued(IOperationExecutionInfo executionInfo, ICommandSender sender) + { + var isDiscontinued = _assetPairsCache.GetAssetPairById(executionInfo.Data.Instrument).IsDiscontinued; + + if (isDiscontinued) + { + if (executionInfo.Data.SwitchState(SpecialLiquidationOperationState.PriceRequested, + SpecialLiquidationOperationState.OnTheWayToFail)) + { + sender.SendCommand(new FailSpecialLiquidationInternalCommand + { + OperationId = executionInfo.Id, + CreationTime = _dateService.Now(), + Reason = "Instrument discontinuation", + + }, _cqrsContextNamesSettings.TradingEngine); + + await _operationExecutionInfoRepository.Save(executionInfo); + } + + return true; + } + + return false; + } + + public async Task InternalRetryPriceRequest(DateTime eventCreationTime, + ICommandSender sender, + IOperationExecutionInfo executionInfo, + TimeSpan retryTimeout) + { + // fix the intention to make another price request to not let the parallel + // ongoing GetPriceForSpecialLiquidationTimeoutInternalCommand execution + // break (fail) the flow + executionInfo.Data.NextRequestNumber(); + await _operationExecutionInfoRepository.Save(executionInfo); + + var shouldRetryAfter = eventCreationTime.Add(retryTimeout); + + var timeLeftBeforeRetry = shouldRetryAfter - _dateService.Now(); + + if (timeLeftBeforeRetry > TimeSpan.Zero) + { + await Task.Delay(timeLeftBeforeRetry); + } + + RequestPrice(sender, executionInfo); + } + + public void RequestPrice(ICommandSender sender, IOperationExecutionInfo + executionInfo) + { + //hack, requested by the bank + var positionsVolume = executionInfo.Data.Volume != 0 ? executionInfo.Data.Volume : 1; + + var command = new GetPriceForSpecialLiquidationCommand + { + OperationId = executionInfo.Id, + CreationTime = _dateService.Now(), + Instrument = executionInfo.Data.Instrument, + Volume = positionsVolume, + RequestNumber = executionInfo.Data.RequestNumber, + RequestedFromCorporateActions = executionInfo.Data.RequestedFromCorporateActions + }; + + if (_marginTradingSettings.ExchangeConnector == ExchangeConnectorType.RealExchangeConnector) + { + //send it to the Gavel + sender.SendCommand(command, _cqrsContextNamesSettings.Gavel); + } + else + { + _specialLiquidationService.SavePriceRequestForSpecialLiquidation(command); + } + + //special command is sent instantly for timeout control.. it is retried until timeout occurs + sender.SendCommand(new GetPriceForSpecialLiquidationTimeoutInternalCommand + { + OperationId = executionInfo.Id, + CreationTime = _dateService.Now(), + TimeoutSeconds = _marginTradingSettings.SpecialLiquidation.PriceRequestTimeoutSec, + RequestNumber = executionInfo.Data.RequestNumber + }, _cqrsContextNamesSettings.TradingEngine); + } public static string GetComment(LiquidationType liquidationType) { diff --git a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs index cfb4253d5..edbaf2710 100644 --- a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs +++ b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs @@ -62,7 +62,8 @@ await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurr var lastOrders = GetOrders(tradingEngineSnapshot); var lastPositions = GetPositions(tradingEngineSnapshot); - var ordersHistory = await _ordersHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp); + var latestOrder = currentOrders.MaxBy(x => x.LastModified); + var ordersHistory = await _ordersHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp, latestOrder?.LastModified); var positionsHistory = await _positionsHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp); var restoredOrders = RestoreOrdersCurrentStateFromHistory(lastOrders, ordersHistory); diff --git a/src/MarginTrading.Backend.Services/MarginTrading.Backend.Services.csproj b/src/MarginTrading.Backend.Services/MarginTrading.Backend.Services.csproj index 1b161eda3..4a50e189e 100644 --- a/src/MarginTrading.Backend.Services/MarginTrading.Backend.Services.csproj +++ b/src/MarginTrading.Backend.Services/MarginTrading.Backend.Services.csproj @@ -6,7 +6,7 @@ false false false - 2.31.0 + 2.29.4 8.0 @@ -32,7 +32,7 @@ - + diff --git a/src/MarginTrading.Backend.Services/Modules/CqrsModule.cs b/src/MarginTrading.Backend.Services/Modules/CqrsModule.cs index 8ec883162..67fb960a7 100644 --- a/src/MarginTrading.Backend.Services/Modules/CqrsModule.cs +++ b/src/MarginTrading.Backend.Services/Modules/CqrsModule.cs @@ -72,6 +72,10 @@ protected override void Load(ContainerBuilder builder) .SingleInstance(); builder.RegisterInstance(new CqrsContextNamesSettings()).AsSelf().SingleInstance(); + builder.RegisterType() + .AsImplementedInterfaces() + .SingleInstance(); + // Sagas & command handlers builder.RegisterAssemblyTypes(GetType().Assembly).Where(t => new[] { "Saga", "CommandsHandler", "Projection" }.Any(ending => t.Name.EndsWith(ending))).AsSelf(); diff --git a/src/MarginTrading.Backend.Services/Services/RfqPauseService.cs b/src/MarginTrading.Backend.Services/Services/RfqPauseService.cs index f249e6cb3..fd47987ba 100644 --- a/src/MarginTrading.Backend.Services/Services/RfqPauseService.cs +++ b/src/MarginTrading.Backend.Services/Services/RfqPauseService.cs @@ -73,7 +73,7 @@ public async Task AddAsync(string operationId, PauseSource so { var existingPause = (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, NotCancelledPredicate)) .SingleOrDefault(); @@ -84,7 +84,7 @@ public async Task AddAsync(string operationId, PauseSource so } var executionInfo = await _executionInfoRepository - .GetAsync(SpecialLiquidationSaga.OperationName, operationId); + .GetAsync(SpecialLiquidationSaga.Name, operationId); if (executionInfo == null) return RfqPauseErrorCode.NotFound; @@ -99,7 +99,7 @@ await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(AddAsync), var pause = Pause.Create( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, source, initiator, _dateService.Now()); @@ -123,7 +123,7 @@ public async Task GetCurrentAsync(string operationId) return (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, NotCancelledPredicate)) .SingleOrDefault(); } @@ -138,21 +138,21 @@ public async Task AcknowledgeAsync(string operationId) { var activePause = (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, ActivePredicate)) .SingleOrDefault(); if (activePause != null) { await _log.WriteInfoAsync(nameof(RfqPauseService), nameof(AcknowledgeAsync), null, - $"The pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}] is effective since [{activePause.EffectiveSince}]"); + $"The pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}] is effective since [{activePause.EffectiveSince}]"); return true; } var pendingPause = (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, PendingPredicate)) .SingleOrDefault(); @@ -173,7 +173,7 @@ await _log.WriteInfoAsync(nameof(RfqPauseService), nameof(AcknowledgeAsync), nul if (!updated) { await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(AcknowledgeAsync), null, - $"Couldn't activate pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}]"); + $"Couldn't activate pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}]"); return false; } @@ -201,7 +201,7 @@ public async Task StopPendingAsync(string operationId, PauseCancellationSource s { var pendingPause = (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, PendingPredicate)) .SingleOrDefault(); @@ -222,7 +222,7 @@ public async Task StopPendingAsync(string operationId, PauseCancellationSource s if (!updated) { await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(StopPendingAsync), null, - $"Couldn't stop pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}]"); + $"Couldn't stop pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}]"); } } @@ -244,7 +244,7 @@ public async Task AcknowledgeCancellationAsync(string operationId) { var pendingCancellationPause = (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, PendingCancellationPredicate)) .SingleOrDefault(); @@ -265,7 +265,7 @@ public async Task AcknowledgeCancellationAsync(string operationId) if (!updated) { await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(AcknowledgeCancellationAsync), null, - $"Couldn't cancel pending cancellation pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}]"); + $"Couldn't cancel pending cancellation pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}]"); return false; } @@ -295,21 +295,21 @@ public async Task ResumeAsync(string operationId, PauseCance try { var executionInfo = await _executionInfoRepository - .GetAsync(SpecialLiquidationSaga.OperationName, operationId); + .GetAsync(SpecialLiquidationSaga.Name, operationId); if (executionInfo == null) return RfqResumeErrorCode.NotFound; var activePause = (await _pauseRepository.FindAsync( operationId, - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, ActivePredicate)) .SingleOrDefault(); if (activePause == null) { await _log.WriteInfoAsync(nameof(RfqPauseService), nameof(ResumeAsync), null, - $"The active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}] was not found"); + $"The active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}] was not found"); return RfqResumeErrorCode.NotPaused; } @@ -342,7 +342,7 @@ await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(ResumeAsync), null, else { await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(ResumeAsync), null, - $"Couldn't cancel active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}] due to database issues"); + $"Couldn't cancel active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}] due to database issues"); return RfqResumeErrorCode.Persistence; } diff --git a/src/MarginTrading.Backend.Services/Workflow/ISagaEventHandler.cs b/src/MarginTrading.Backend.Services/Workflow/ISagaEventHandler.cs new file mode 100644 index 000000000..4575df0da --- /dev/null +++ b/src/MarginTrading.Backend.Services/Workflow/ISagaEventHandler.cs @@ -0,0 +1,18 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; +using Lykke.Cqrs; + +namespace MarginTrading.Backend.Services.Workflow +{ + /// + /// Base interface for all saga event handlers + /// + public interface ISagaEventHandler + { + Task Handle(TEvent @event, ICommandSender sender); + + Task CanHandle(TEvent @event) => Task.FromResult(true); + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/ISpecialLiquidationSagaEventHandler.cs b/src/MarginTrading.Backend.Services/Workflow/ISpecialLiquidationSagaEventHandler.cs new file mode 100644 index 000000000..190c5fec6 --- /dev/null +++ b/src/MarginTrading.Backend.Services/Workflow/ISpecialLiquidationSagaEventHandler.cs @@ -0,0 +1,12 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +namespace MarginTrading.Backend.Services.Workflow +{ + /// + /// Marker interface for special liquidation saga event handlers + /// + public interface ISpecialLiquidationSagaEventHandler + { + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/PlatformClosureProjection.cs b/src/MarginTrading.Backend.Services/Workflow/PlatformClosureProjection.cs index 677b7a073..02811f3c7 100644 --- a/src/MarginTrading.Backend.Services/Workflow/PlatformClosureProjection.cs +++ b/src/MarginTrading.Backend.Services/Workflow/PlatformClosureProjection.cs @@ -36,44 +36,67 @@ public PlatformClosureProjection(ISnapshotService snapshotService, [UsedImplicitly] public async Task Handle(MarketStateChangedEvent e) { - if (!e.IsPlatformClosureEvent()) + if (e.IsNotPlatformClosureEvent()) return; - var tradingDay = DateOnly.FromDateTime(e.EventTimestamp); - - string result; try { - result = await _snapshotService.MakeTradingDataSnapshot(e.EventTimestamp.Date, - _identityGenerator.GenerateGuid(), - SnapshotStatus.Draft); + var successMessage = await CreateDraftSnapshot(e.EventTimestamp.Date); + await LogIfSucceeded(successMessage, e); } catch (Exception ex) { - await _log.WriteWarningAsync(nameof(PlatformClosureProjection), - nameof(Handle), - e.ToJson(), - $"Failed to make trading data draft snapshot for [{tradingDay}]", ex); - - if (tradingDay < _dateService.NowDateOnly()) + var exceptionExpected = await IsExceptionExpected(ex, e); + if (!exceptionExpected) { - await _log.WriteWarningAsync(nameof(PlatformClosureProjection), - nameof(Handle), - e.ToJson(), - "The event is for the past date, so the snapshot draft will not be created.", ex); - return; + throw; } - - throw; } + } + + private async Task LogIfSucceeded(string successMessage, MarketStateChangedEvent evt) + { + var failed = string.IsNullOrWhiteSpace(successMessage); + if (failed) return; + + await _log.WriteInfoAsync(nameof(PlatformClosureProjection), + nameof(LogIfSucceeded), + evt.ToJson(), + successMessage); + } + + private async Task CreateDraftSnapshot(DateTime tradingDay) + { + var result = await _snapshotService.MakeTradingDataSnapshot(tradingDay, + _identityGenerator.GenerateGuid(), + SnapshotStatus.Draft); + return result; + } - if (!string.IsNullOrWhiteSpace(result)) + private async Task IsExceptionExpected(Exception ex, MarketStateChangedEvent evt) + { + if (IsEventForPastDate(evt)) { - await _log.WriteInfoAsync(nameof(PlatformClosureProjection), - nameof(Handle), - e.ToJson(), - result); + await _log.WriteWarningAsync(nameof(PlatformClosureProjection), + nameof(IsExceptionExpected), + evt.ToJson(), + "The event is for the past date, so the snapshot draft will not be created.", ex); + return true; } + + var tradingDayFromEvent = DateOnly.FromDateTime(evt.EventTimestamp); + await _log.WriteErrorAsync(nameof(PlatformClosureProjection), + nameof(IsExceptionExpected), + new {eventJson = evt.ToJson(), tradingDay = tradingDayFromEvent}.ToJson(), + ex); + + return false; + } + + private bool IsEventForPastDate(MarketStateChangedEvent evt) + { + var tradingDayFromEvent = DateOnly.FromDateTime(evt.EventTimestamp); + return tradingDayFromEvent < _dateService.NowDateOnly(); } } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/SagaEventHandlerExtensions.cs b/src/MarginTrading.Backend.Services/Workflow/SagaEventHandlerExtensions.cs new file mode 100644 index 000000000..e01c9fbc5 --- /dev/null +++ b/src/MarginTrading.Backend.Services/Workflow/SagaEventHandlerExtensions.cs @@ -0,0 +1,53 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Threading.Tasks; +using Lykke.Cqrs; + +namespace MarginTrading.Backend.Services.Workflow +{ + public static class SagaEventHandlerExtensions + { + /// + /// Find handler which implements ISagaEventHandler{TEvent} by convention + /// + /// + /// + /// + /// When no handler found + public static ISagaEventHandler First( + this IEnumerable handlers) + { + foreach (var handler in handlers) + { + if (handler is ISagaEventHandler typedHandler) + { + return typedHandler; + } + } + + throw new KeyNotFoundException($"No handler for {typeof(TEvent)} found"); + } + + /// + /// Finds handler which implements ISagaEventHandler{TEvent} by convention and calls it + /// + /// + /// + /// + /// + /// + /// Before calling handler checks if it can handle event + public static async Task Handle( + this IEnumerable handlers, + TEvent @event, + ICommandSender sender) + { + var firstHandler = handlers.First(); + + if (await firstHandler.CanHandle(@event)) + await firstHandler.Handle(@event, sender); + } + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/Commands/FailSpecialLiquidationInternalCommand.cs b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/Commands/FailSpecialLiquidationInternalCommand.cs index ce24515ba..1a425dc88 100644 --- a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/Commands/FailSpecialLiquidationInternalCommand.cs +++ b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/Commands/FailSpecialLiquidationInternalCommand.cs @@ -17,5 +17,8 @@ public class FailSpecialLiquidationInternalCommand [Key(2)] public string Reason { get; set; } + + [Key(3)] + public bool? CanRetryPriceRequest { get; set; } } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationCommandsHandler.cs b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationCommandsHandler.cs index ec42ceda2..73c66cdf4 100644 --- a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationCommandsHandler.cs +++ b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationCommandsHandler.cs @@ -179,10 +179,10 @@ await _log.WriteWarningAsync( } var (executionInfo, _) = await _operationExecutionInfoRepository.GetOrAddAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, operationId: command.OperationId, factory: () => new OperationExecutionInfo( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId, lastModified: _dateService.Now(), data: new SpecialLiquidationOperationData @@ -275,10 +275,10 @@ await _log.WriteWarningAsync( } var (executionInfo, _) = await _operationExecutionInfoRepository.GetOrAddAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, operationId: command.OperationId, factory: () => new OperationExecutionInfo( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId, lastModified: _dateService.Now(), data: new SpecialLiquidationOperationData @@ -313,7 +313,7 @@ private async Task Handle(GetPriceForSpecialLiquidationTi IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data != null) @@ -356,7 +356,7 @@ await _log.WriteInfoAsync(nameof(SpecialLiquidationCommandsHandler), nameof(GetP private async Task Handle(ExecuteSpecialLiquidationOrderCommand command, IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data == null) @@ -515,7 +515,7 @@ await _log.WriteWarningAsync(nameof(SpecialLiquidationCommandsHandler), private async Task Handle(ExecuteSpecialLiquidationOrdersInternalCommand command, IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data == null) @@ -570,7 +570,7 @@ await _tradingEngine.LiquidatePositionsUsingSpecialWorkflowAsync( private async Task Handle(FailSpecialLiquidationInternalCommand command, IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data == null) @@ -586,6 +586,7 @@ private async Task Handle(FailSpecialLiquidationInternalCommand command, IEventP OperationId = command.OperationId, CreationTime = _dateService.Now(), Reason = command.Reason, + CanRetryPriceRequest = command.CanRetryPriceRequest ?? false }); _chaosKitty.Meow(command.OperationId); @@ -598,7 +599,7 @@ private async Task Handle(FailSpecialLiquidationInternalCommand command, IEventP private async Task Handle(CancelSpecialLiquidationCommand command, IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data == null) @@ -628,7 +629,7 @@ private async Task Handle(CancelSpecialLiquidationCommand command, IEventPublish private async Task Handle(ClosePositionsRegularFlowCommand command, IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data == null) @@ -662,7 +663,7 @@ private bool TryGetExchangeNameFromPositions(IEnumerable positions, ou private async Task Handle(ResumePausedSpecialLiquidationCommand command, IEventPublisher publisher) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: SpecialLiquidationSaga.OperationName, + operationName: SpecialLiquidationSaga.Name, id: command.OperationId); if (executionInfo?.Data == null) diff --git a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationSaga.cs b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationSaga.cs index 2e8cfd2d2..dc4b744a6 100644 --- a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationSaga.cs +++ b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidation/SpecialLiquidationSaga.cs @@ -1,9 +1,7 @@ // Copyright (c) 2019 Lykke Corp. // See the LICENSE file in the project root for more information. -using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; using Common.Log; using JetBrains.Annotations; @@ -31,17 +29,17 @@ public class SpecialLiquidationSaga private readonly IDateService _dateService; private readonly IChaosKitty _chaosKitty; private readonly IOperationExecutionInfoRepository _operationExecutionInfoRepository; - private readonly IRfqService _specialLiquidationService; - private readonly LiquidationHelper _liquidationHelper; private readonly OrdersCache _ordersCache; private readonly IRfqPauseService _rfqPauseService; - private readonly IAssetPairsCache _assetPairsCache; + private readonly LiquidationHelper _liquidationHelper; private readonly ILog _log; private readonly MarginTradingSettings _marginTradingSettings; private readonly CqrsContextNamesSettings _cqrsContextNamesSettings; - public const string OperationName = "SpecialLiquidation"; + public const string Name = "SpecialLiquidation"; + + private readonly IEnumerable _eventHandlers; public SpecialLiquidationSaga( IDateService dateService, @@ -50,30 +48,30 @@ public SpecialLiquidationSaga( IRfqService specialLiquidationService, MarginTradingSettings marginTradingSettings, CqrsContextNamesSettings cqrsContextNamesSettings, - LiquidationHelper liquidationHelper, OrdersCache ordersCache, IRfqPauseService rfqPauseService, ILog log, - IAssetPairsCache assetPairsCache) + IAssetPairsCache assetPairsCache, + LiquidationHelper liquidationHelper, + IEnumerable eventHandlers) { _dateService = dateService; _chaosKitty = chaosKitty; _operationExecutionInfoRepository = operationExecutionInfoRepository; - _specialLiquidationService = specialLiquidationService; _marginTradingSettings = marginTradingSettings; _cqrsContextNamesSettings = cqrsContextNamesSettings; - _liquidationHelper = liquidationHelper; _ordersCache = ordersCache; _rfqPauseService = rfqPauseService; _log = log; - _assetPairsCache = assetPairsCache; + _liquidationHelper = liquidationHelper; + _eventHandlers = eventHandlers; } [UsedImplicitly] private async Task Handle(SpecialLiquidationStartedInternalEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) @@ -89,7 +87,7 @@ private async Task Handle(SpecialLiquidationStartedInternalEvent e, ICommandSend executionInfo.Data.RequestNumber = 1; executionInfo.Data.TryStartClosing(id => _ordersCache.Positions.GetPositionById(id), _dateService.Now); - RequestPrice(sender, executionInfo); + _liquidationHelper.RequestPrice(sender, executionInfo); _chaosKitty.Meow(e.OperationId); @@ -101,7 +99,7 @@ private async Task Handle(SpecialLiquidationStartedInternalEvent e, ICommandSend private async Task Handle(PriceForSpecialLiquidationCalculatedEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) @@ -125,7 +123,7 @@ private async Task Handle(PriceForSpecialLiquidationCalculatedEvent e, ICommandS executionInfo.Data.NextRequestNumber(); - RequestPrice(sender, executionInfo); + _liquidationHelper.RequestPrice(sender, executionInfo); //switch state back, because we requested the price again and should handle in correctly when received executionInfo.Data.State = SpecialLiquidationOperationState.PriceRequested; @@ -160,25 +158,11 @@ await _rfqPauseService.StopPendingAsync(e.OperationId, PauseCancellationSource.P private async Task Handle(PriceForSpecialLiquidationCalculationFailedEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); - - if (executionInfo?.Data == null) - return; - - if (PriceRequestRetryRequired(executionInfo.Data.RequestedFromCorporateActions)) - { - var isDiscontinued = await FailIfInstrumentDiscontinued(executionInfo, sender); - if (isDiscontinued) return; - var pauseAcknowledged = await _rfqPauseService.AcknowledgeAsync(executionInfo.Id); - if (pauseAcknowledged) return; - - await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, - _marginTradingSettings.SpecialLiquidation.PriceRequestRetryTimeout.Value); - + if (executionInfo?.Data == null) return; - } if (executionInfo.Data.SwitchState(SpecialLiquidationOperationState.PriceRequested, SpecialLiquidationOperationState.OnTheWayToFail)) @@ -187,7 +171,8 @@ await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, { OperationId = e.OperationId, CreationTime = _dateService.Now(), - Reason = e.Reason + Reason = e.Reason, + CanRetryPriceRequest = true }, _cqrsContextNamesSettings.TradingEngine); _chaosKitty.Meow(e.OperationId); @@ -200,7 +185,7 @@ await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, private async Task Handle(SpecialLiquidationOrderExecutedEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) @@ -231,21 +216,21 @@ private async Task Handle(SpecialLiquidationOrderExecutedEvent e, ICommandSender private async Task Handle(SpecialLiquidationOrderExecutionFailedEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) return; - if (PriceRequestRetryRequired(executionInfo.Data.RequestedFromCorporateActions)) + if (PriceRequestRetryRequired(executionInfo.Data.RequestedFromCorporateActions, _marginTradingSettings.SpecialLiquidation)) { - var isDiscontinued = await FailIfInstrumentDiscontinued(executionInfo, sender); + var isDiscontinued = await _liquidationHelper.FailIfInstrumentDiscontinued(executionInfo, sender); if (isDiscontinued) return; if (executionInfo.Data.SwitchState(SpecialLiquidationOperationState.ExternalOrderExecuted, SpecialLiquidationOperationState.PriceRequested)) { - await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, + await _liquidationHelper.InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, _marginTradingSettings.SpecialLiquidation.PriceRequestRetryTimeout.Value); return; @@ -272,7 +257,7 @@ await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, private async Task Handle(SpecialLiquidationFinishedEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) @@ -301,78 +286,13 @@ private async Task Handle(SpecialLiquidationFinishedEvent e, ICommandSender send } [UsedImplicitly] - private async Task Handle(SpecialLiquidationFailedEvent e, ICommandSender sender) - { - var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, - id: e.OperationId); - - if (executionInfo?.Data == null) - return; - - if (e.CanRetryPriceRequest) - { - if (!executionInfo.Data.RequestedFromCorporateActions) - { - var positions = _ordersCache.Positions - .GetPositionsByAccountIds(executionInfo.Data.AccountId) - .Where(p => executionInfo.Data.PositionIds.Contains(p.Id)) - .ToArray(); - - if (_liquidationHelper.CheckIfNetVolumeCanBeLiquidated(executionInfo.Data.Instrument, positions, out _)) - { - // there is liquidity so we can cancel the special liquidation flow. - sender.SendCommand(new CancelSpecialLiquidationCommand - { - OperationId = e.OperationId, - Reason = "Liquidity is enough to close positions within regular flow" - }, _cqrsContextNamesSettings.TradingEngine); - return; - } - } - - if (PriceRequestRetryRequired(executionInfo.Data.RequestedFromCorporateActions)) - { - var pauseAcknowledged = await _rfqPauseService.AcknowledgeAsync(executionInfo.Id); - if (pauseAcknowledged) return; - - if (executionInfo.Data.SwitchState(executionInfo.Data.State, - SpecialLiquidationOperationState.PriceRequested)) - { - await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, - _marginTradingSettings.SpecialLiquidation.PriceRequestRetryTimeout.Value); - - return; - } - } - } - - if (executionInfo.Data.SwitchState(executionInfo.Data.State,//from any state - SpecialLiquidationOperationState.Failed)) - { - if (!string.IsNullOrEmpty(executionInfo.Data.CausationOperationId)) - { - sender.SendCommand(new ResumeLiquidationInternalCommand - { - OperationId = executionInfo.Data.CausationOperationId, - CreationTime = _dateService.Now(), - Comment = $"Resume after special liquidation {executionInfo.Id} failed. Reason: {e.Reason}", - IsCausedBySpecialLiquidation = true, - CausationOperationId = executionInfo.Id - }, _cqrsContextNamesSettings.TradingEngine); - } - - _chaosKitty.Meow(e.OperationId); + private Task Handle(SpecialLiquidationFailedEvent e, ICommandSender sender) => _eventHandlers.Handle(e, sender); - await _operationExecutionInfoRepository.Save(executionInfo); - } - } - [UsedImplicitly] private async Task Handle(SpecialLiquidationCancelledEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) @@ -404,7 +324,7 @@ private async Task Handle(SpecialLiquidationCancelledEvent e, ICommandSender sen private async Task Handle(ResumePausedSpecialLiquidationFailedEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) @@ -418,21 +338,21 @@ await _log.WriteWarningAsync(nameof(SpecialLiquidationSaga), private async Task Handle(ResumePausedSpecialLiquidationSucceededEvent e, ICommandSender sender) { var executionInfo = await _operationExecutionInfoRepository.GetAsync( - operationName: OperationName, + operationName: Name, id: e.OperationId); if (executionInfo?.Data == null) return; - if (PriceRequestRetryRequired(executionInfo.Data.RequestedFromCorporateActions)) + if (PriceRequestRetryRequired(executionInfo.Data.RequestedFromCorporateActions, _marginTradingSettings.SpecialLiquidation)) { - var isDiscontinued = await FailIfInstrumentDiscontinued(executionInfo, sender); + var isDiscontinued = await _liquidationHelper.FailIfInstrumentDiscontinued(executionInfo, sender); if (isDiscontinued) return; if (executionInfo.Data.SwitchState(executionInfo.Data.State, SpecialLiquidationOperationState.PriceRequested)) { - await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, + await _liquidationHelper.InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, _marginTradingSettings.SpecialLiquidation.PriceRequestRetryTimeout.Value); return; @@ -446,7 +366,7 @@ await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, { OperationId = e.OperationId, CreationTime = _dateService.Now(), - Reason = $"Pause cancellation succeeded but then price request was not initiated for operation id [{e.OperationId} and name [{OperationName}]]" + Reason = $"Pause cancellation succeeded but then price request was not initiated for operation id [{e.OperationId} and name [{Name}]]" }, _cqrsContextNamesSettings.TradingEngine); _chaosKitty.Meow(e.OperationId); @@ -455,104 +375,8 @@ await InternalRetryPriceRequest(e.CreationTime, sender, executionInfo, } } - private decimal GetActualNetPositionCloseVolume(ICollection positionIds, string accountId) - { - var netPositionVolume = _ordersCache.GetPositions() - .Where(x => positionIds.Contains(x.Id) - && (string.IsNullOrEmpty(accountId) || x.AccountId == accountId)) - .Sum(x => x.Volume); - - return -netPositionVolume; - } - - private void RequestPrice(ICommandSender sender, IOperationExecutionInfo - executionInfo) - { - //hack, requested by the bank - var positionsVolume = executionInfo.Data.Volume != 0 ? executionInfo.Data.Volume : 1; - - var command = new GetPriceForSpecialLiquidationCommand - { - OperationId = executionInfo.Id, - CreationTime = _dateService.Now(), - Instrument = executionInfo.Data.Instrument, - Volume = positionsVolume, - RequestNumber = executionInfo.Data.RequestNumber, - RequestedFromCorporateActions = executionInfo.Data.RequestedFromCorporateActions - }; - - if (_marginTradingSettings.ExchangeConnector == ExchangeConnectorType.RealExchangeConnector) - { - //send it to the Gavel - sender.SendCommand(command, _cqrsContextNamesSettings.Gavel); - } - else - { - _specialLiquidationService.SavePriceRequestForSpecialLiquidation(command); - } - - //special command is sent instantly for timeout control.. it is retried until timeout occurs - sender.SendCommand(new GetPriceForSpecialLiquidationTimeoutInternalCommand - { - OperationId = executionInfo.Id, - CreationTime = _dateService.Now(), - TimeoutSeconds = _marginTradingSettings.SpecialLiquidation.PriceRequestTimeoutSec, - RequestNumber = executionInfo.Data.RequestNumber - }, _cqrsContextNamesSettings.TradingEngine); - } - - private bool PriceRequestRetryRequired(bool requestedFromCorporateActions) => - _marginTradingSettings.SpecialLiquidation.PriceRequestRetryTimeout.HasValue && - (!requestedFromCorporateActions || - _marginTradingSettings.SpecialLiquidation.RetryPriceRequestForCorporateActions); - - private async Task InternalRetryPriceRequest(DateTime eventCreationTime, - ICommandSender sender, - IOperationExecutionInfo executionInfo, - TimeSpan retryTimeout) - { - // fix the intention to make another price request to not let the parallel - // ongoing GetPriceForSpecialLiquidationTimeoutInternalCommand execution - // break (fail) the flow - executionInfo.Data.NextRequestNumber(); - await _operationExecutionInfoRepository.Save(executionInfo); - - var shouldRetryAfter = eventCreationTime.Add(retryTimeout); - - var timeLeftBeforeRetry = shouldRetryAfter - _dateService.Now(); - - if (timeLeftBeforeRetry > TimeSpan.Zero) - { - await Task.Delay(timeLeftBeforeRetry); - } - - RequestPrice(sender, executionInfo); - } - - private async Task FailIfInstrumentDiscontinued(IOperationExecutionInfo executionInfo, ICommandSender sender) - { - var isDiscontinued = _assetPairsCache.GetAssetPairById(executionInfo.Data.Instrument).IsDiscontinued; - - if (isDiscontinued) - { - if (executionInfo.Data.SwitchState(SpecialLiquidationOperationState.PriceRequested, - SpecialLiquidationOperationState.OnTheWayToFail)) - { - sender.SendCommand(new FailSpecialLiquidationInternalCommand - { - OperationId = executionInfo.Id, - CreationTime = _dateService.Now(), - Reason = "Instrument discontinuation", - - }, _cqrsContextNamesSettings.TradingEngine); - - await _operationExecutionInfoRepository.Save(executionInfo); - } - - return true; - } - - return false; - } + internal static bool PriceRequestRetryRequired(bool requestedFromCorporateActions, SpecialLiquidationSettings specialLiquidationSettings) => + specialLiquidationSettings.PriceRequestRetryTimeout.HasValue && + (!requestedFromCorporateActions || specialLiquidationSettings.RetryPriceRequestForCorporateActions); } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidationCommandSenderExtensions.cs b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidationCommandSenderExtensions.cs new file mode 100644 index 000000000..9a6dcc636 --- /dev/null +++ b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidationCommandSenderExtensions.cs @@ -0,0 +1,61 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System; +using Lykke.Cqrs; +using MarginTrading.Backend.Services.Workflow.Liquidation.Commands; +using MarginTrading.Backend.Services.Workflow.SpecialLiquidation.Commands; + +namespace MarginTrading.Backend.Services.Workflow +{ + public static class SpecialLiquidationCommandSenderExtensions + { + /// + /// Sends a command to resume the initial flow (liquidation) + /// + /// + /// + /// + /// + /// + public static void SendResumeLiquidation(this ICommandSender sender, + string liquidationId, + string specialLiquidationId, + DateTime timestamp) + { + if (string.IsNullOrWhiteSpace(liquidationId)) + throw new ArgumentNullException(nameof(liquidationId)); + + if (string.IsNullOrWhiteSpace(specialLiquidationId)) + throw new ArgumentNullException(nameof(specialLiquidationId)); + + sender.SendCommand(new ResumeLiquidationInternalCommand + { + OperationId = liquidationId, + CreationTime = timestamp, + Comment = $"Resume after special liquidation {specialLiquidationId} failed.", + IsCausedBySpecialLiquidation = true, + CausationOperationId = specialLiquidationId + }, "TradingEngine"); + } + + /// + /// Sends a command to cancel special liquidation + /// + /// + /// + /// + /// + public static void SendCancellation(this ICommandSender sender, string liquidationId, string reason) + { + if (string.IsNullOrWhiteSpace(liquidationId)) + throw new ArgumentNullException(nameof(liquidationId)); + + sender.SendCommand(new CancelSpecialLiquidationCommand + { + OperationId = liquidationId, + Reason = reason + }, "TradingEngine"); + } + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidationFailedEventHandler.cs b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidationFailedEventHandler.cs new file mode 100644 index 000000000..2da429636 --- /dev/null +++ b/src/MarginTrading.Backend.Services/Workflow/SpecialLiquidationFailedEventHandler.cs @@ -0,0 +1,223 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using JetBrains.Annotations; +using Lykke.Cqrs; +using MarginTrading.Backend.Contracts.Workflow.SpecialLiquidation.Events; +using MarginTrading.Backend.Core; +using MarginTrading.Backend.Core.Extensions; +using MarginTrading.Backend.Core.Repositories; +using MarginTrading.Backend.Core.Settings; +using MarginTrading.Backend.Services.Helpers; +using MarginTrading.Backend.Services.Services; +using MarginTrading.Backend.Services.Workflow.SpecialLiquidation; +using MarginTrading.Common.Services; +using ExecutionInfo = MarginTrading.Backend.Core.IOperationExecutionInfo; + +namespace MarginTrading.Backend.Services.Workflow +{ + /// + /// Event handler implementation for + /// + public class SpecialLiquidationFailedEventHandler : + ISagaEventHandler, + ISpecialLiquidationSagaEventHandler + { + /// + /// The possible verdicts of the event handler what to do next + /// + internal enum NextAction + { + /// + /// Finish the special liquidation flow + /// + Complete, + + /// + /// Cancel the special liquidation flow + /// + Cancel, + + /// + /// Pause the special liquidation flow + /// + Pause, + + /// + /// Continue with price request retry + /// + RetryPriceRequest, + + /// + /// Resume the initial flow caused the special liquidation + /// + ResumeInitialFlow + } + + /// + /// Map of the possible next states of the special liquidation flow. + /// Null means that the state should not be changed. + /// + private static readonly Dictionary NextStateMap = + new Dictionary + { + { NextAction.Complete, SpecialLiquidationOperationState.Failed }, + { NextAction.Cancel, SpecialLiquidationOperationState.Cancelled }, + { NextAction.Pause, null }, + { NextAction.RetryPriceRequest, SpecialLiquidationOperationState.PriceRequested }, + { NextAction.ResumeInitialFlow, SpecialLiquidationOperationState.Failed }, + }; + + private const string LiquidityIsSufficientReason = "Liquidity is sufficient to close positions within regular flow"; + + private readonly IDateService _dateService; + private readonly IOperationExecutionInfoRepository _operationExecutionInfoRepository; + private readonly OrdersCache _ordersCache; + private readonly LiquidationHelper _liquidationHelper; + private readonly MarginTradingSettings _marginTradingSettings; + private readonly IRfqPauseService _rfqPauseService; + private readonly IAssetPairsCache _assetPairsCache; + + public SpecialLiquidationFailedEventHandler(IOperationExecutionInfoRepository operationExecutionInfoRepository, + OrdersCache ordersCache, + LiquidationHelper liquidationHelper, + IRfqPauseService rfqPauseService, + IDateService dateService, + MarginTradingSettings marginTradingSettings, + IAssetPairsCache assetPairsCache) + { + _operationExecutionInfoRepository = operationExecutionInfoRepository; + _ordersCache = ordersCache; + _liquidationHelper = liquidationHelper; + _rfqPauseService = rfqPauseService; + _dateService = dateService; + _marginTradingSettings = marginTradingSettings; + _assetPairsCache = assetPairsCache; + } + + public async Task Handle(SpecialLiquidationFailedEvent @event, ICommandSender sender) + { + var executionInfo = await GetExecutionInfo(@event.OperationId); + + var instrument = _assetPairsCache.GetAssetPairById(executionInfo!.Data.Instrument); + + var nextAction = await DetermineNextAction( + executionInfo: executionInfo, + instrumentDiscontinued: instrument.IsDiscontinued, + liquidityIsEnough: GetLiquidityIsEnough, + canRetryPriceRequest: @event.CanRetryPriceRequest, + pauseIsAcknowledged: _rfqPauseService.AcknowledgeAsync, + configuration: _marginTradingSettings.SpecialLiquidation); + + var nextState = GetNextState(nextAction); + if (!await TrySaveState(executionInfo, nextState)) + return; + + switch (nextAction) + { + case NextAction.Cancel: + sender.SendCancellation(@event.OperationId, LiquidityIsSufficientReason); + break; + case NextAction.ResumeInitialFlow: + sender.SendResumeLiquidation(executionInfo.Data.CausationOperationId, + executionInfo.Id, + _dateService.Now()); + break; + case NextAction.RetryPriceRequest: + executionInfo = await GetExecutionInfo(@event.OperationId); + await _liquidationHelper.InternalRetryPriceRequest(@event.CreationTime, + sender, + executionInfo, + _marginTradingSettings.SpecialLiquidation.PriceRequestRetryTimeout!.Value); + break; + default: + return; + } + } + + public async Task CanHandle(SpecialLiquidationFailedEvent @event) + { + var executionInfo = await GetExecutionInfo(@event.OperationId); + + return executionInfo?.Data != null; + } + + /// + /// Pure business logic to determine what to be the next step of the special liquidation flow + /// + /// Current special liquidation execution info + /// Current state of discontinuation of the instrument + /// Function to check if there is enough liquidity to close the positions + /// Whether the price request can be retried + /// Function to check if the pause was requested and acknowledged + /// Special liquidation settings + /// + [Pure] + internal static async Task DetermineNextAction( + ExecutionInfo executionInfo, + bool instrumentDiscontinued, + Func liquidityIsEnough, + bool canRetryPriceRequest, + Func> pauseIsAcknowledged, + SpecialLiquidationSettings configuration) + { + if (instrumentDiscontinued) + return NextAction.Complete; + + var caInitiated = executionInfo.Data.RequestedFromCorporateActions; + if (!caInitiated && liquidityIsEnough(executionInfo)) + return NextAction.Cancel; + + var retryRequired = SpecialLiquidationSaga.PriceRequestRetryRequired(caInitiated, configuration); + if (retryRequired && canRetryPriceRequest) + { + if (await pauseIsAcknowledged(executionInfo.Id)) + return NextAction.Pause; + + return NextAction.RetryPriceRequest; + } + + var hasCausingLiquidation = !string.IsNullOrEmpty(executionInfo.Data.CausationOperationId); + if (hasCausingLiquidation) + return NextAction.ResumeInitialFlow; + + return NextAction.Complete; + } + + private static SpecialLiquidationOperationState? GetNextState(NextAction nextAction) => + NextStateMap.TryGetValue(nextAction, out var state) ? state : null; + + private bool GetLiquidityIsEnough(ExecutionInfo executionInfo) + { + var positions = _ordersCache.Positions + .GetPositionsByAccountIds(executionInfo.Data.AccountId) + .Where(p => executionInfo.Data.PositionIds.Contains(p.Id)) + .ToArray(); + + return _liquidationHelper.CheckIfNetVolumeCanBeLiquidated(executionInfo.Data.Instrument, positions, out _); + } + + private async Task TrySaveState(ExecutionInfo executionInfo, SpecialLiquidationOperationState? state) + { + if (!state.HasValue) + return false; + + if (executionInfo.SwitchToState(state.Value)) + { + await _operationExecutionInfoRepository.Save(executionInfo); + return true; + } + + return false; + } + + private Task> GetExecutionInfo(string operationId) => + _operationExecutionInfoRepository.GetAsync( + operationName: SpecialLiquidationSaga.Name, + operationId); + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.TestClient/MarginTrading.Backend.TestClient.csproj b/src/MarginTrading.Backend.TestClient/MarginTrading.Backend.TestClient.csproj index dd4f260c1..3201d3090 100644 --- a/src/MarginTrading.Backend.TestClient/MarginTrading.Backend.TestClient.csproj +++ b/src/MarginTrading.Backend.TestClient/MarginTrading.Backend.TestClient.csproj @@ -3,7 +3,7 @@ Exe net6.0 8.0 - 2.31.0 + 2.29.4 1701;1702;1705;CA2007;0612;0618;1591 diff --git a/src/MarginTrading.Backend/MarginTrading.Backend.csproj b/src/MarginTrading.Backend/MarginTrading.Backend.csproj index da704ca10..9efd3d30f 100644 --- a/src/MarginTrading.Backend/MarginTrading.Backend.csproj +++ b/src/MarginTrading.Backend/MarginTrading.Backend.csproj @@ -8,7 +8,7 @@ false false false - 2.31.0 + 2.29.4 8.0 OutOfProcess AspNetCoreModuleV2 diff --git a/src/MarginTrading.Brokers/MarginTrading.AccountMarginEventsBroker/MarginTrading.AccountMarginEventsBroker.csproj b/src/MarginTrading.Brokers/MarginTrading.AccountMarginEventsBroker/MarginTrading.AccountMarginEventsBroker.csproj index b5ee2f710..eb9c91ae2 100644 --- a/src/MarginTrading.Brokers/MarginTrading.AccountMarginEventsBroker/MarginTrading.AccountMarginEventsBroker.csproj +++ b/src/MarginTrading.Brokers/MarginTrading.AccountMarginEventsBroker/MarginTrading.AccountMarginEventsBroker.csproj @@ -2,7 +2,7 @@ Exe net6.0 - 2.31.0 + 2.29.4 8.0 diff --git a/src/MarginTrading.Common/MarginTrading.Common.csproj b/src/MarginTrading.Common/MarginTrading.Common.csproj index 5c718f864..9c575d36d 100644 --- a/src/MarginTrading.Common/MarginTrading.Common.csproj +++ b/src/MarginTrading.Common/MarginTrading.Common.csproj @@ -6,7 +6,7 @@ false false false - 2.31.0 + 2.29.4 8.0 diff --git a/src/MarginTrading.Contract/MarginTrading.Contract.csproj b/src/MarginTrading.Contract/MarginTrading.Contract.csproj index 0cad1e935..2110a4171 100644 --- a/src/MarginTrading.Contract/MarginTrading.Contract.csproj +++ b/src/MarginTrading.Contract/MarginTrading.Contract.csproj @@ -1,7 +1,7 @@  netstandard2.0 - 2.31.0 + 2.29.4 Lykke.MarginTrading.Contracts 8.0 diff --git a/src/MarginTrading.SqlRepositories/MarginTrading.SqlRepositories.csproj b/src/MarginTrading.SqlRepositories/MarginTrading.SqlRepositories.csproj index 0cc43c6c6..a4f18ffaa 100644 --- a/src/MarginTrading.SqlRepositories/MarginTrading.SqlRepositories.csproj +++ b/src/MarginTrading.SqlRepositories/MarginTrading.SqlRepositories.csproj @@ -6,7 +6,7 @@ false false false - 2.31.0 + 2.29.4 8.0 diff --git a/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs b/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs index 4977b17cd..b23e82f38 100644 --- a/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs +++ b/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs @@ -34,7 +34,7 @@ CASE [Status] ELSE 99 END as StatusOrder FROM [{0}] oh (NOLOCK) - WHERE oh.ModifiedTimestamp > @Timestamp + WHERE oh.ModifiedTimestamp > @From AND (@To IS NULL OR oh.ModifiedTimestamp <= @To) ), filteredOrderHistWithRowNumber AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY Id ORDER BY @@ -54,11 +54,11 @@ public OrdersHistoryRepository(string connectionString, string tableName, int ge _getLastSnapshotTimeoutS = getLastSnapshotTimeoutS; } - public async Task> GetLastSnapshot(DateTime @from) + public async Task> GetLastSnapshot(DateTime @from, DateTime? @to = null) { using (var conn = new SqlConnection(_connectionString)) { - var data = await conn.QueryAsync(_select, new { Timestamp = @from }, commandTimeout: _getLastSnapshotTimeoutS); + var data = await conn.QueryAsync(_select, new { From = @from, To = @to }, commandTimeout: _getLastSnapshotTimeoutS); return data.Cast().ToList(); } diff --git a/tests/MarginTradingTests/BaseTests.cs b/tests/MarginTradingTests/BaseTests.cs index aa72a5618..806db0fb1 100644 --- a/tests/MarginTradingTests/BaseTests.cs +++ b/tests/MarginTradingTests/BaseTests.cs @@ -190,6 +190,7 @@ private void RegisterDependenciesCore(bool mockEvents = false) var exchangeConnector = Mock.Of(); builder.RegisterInstance(exchangeConnector).As(); builder.RegisterInstance(Mock.Of()).As(); + builder.RegisterInstance(Mock.Of()).As(); builder.RegisterBuildCallback(c => { diff --git a/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs b/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs index 33dccad2e..3b43f72ea 100644 --- a/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs +++ b/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs @@ -62,8 +62,8 @@ public void SetUp() _orderCacheMock.Setup(o => o.GetPositions()) .Returns(() => _currentPositions.ToImmutableArray()); - _ordersHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny())) - .ReturnsAsync((DateTime date) => _ordersHistory); + _ordersHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny(), It.IsAny())) + .ReturnsAsync((DateTime from, DateTime? to) => _ordersHistory); _positionsHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny())) .ReturnsAsync((DateTime date) => _positionsHistory); diff --git a/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs b/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs index 9cfd403a4..22fd9eede 100644 --- a/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs +++ b/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs @@ -29,7 +29,7 @@ protected override void Load(ContainerBuilder builder) blobRepository.Setup(s => s.ReadWithTimestampAsync>(It.IsAny(), It.IsAny())) .ReturnsAsync((new List(), DateTime.UtcNow)); var orderHistoryRepository = new Mock(); - orderHistoryRepository.Setup(s => s.GetLastSnapshot(It.IsAny())) + orderHistoryRepository.Setup(s => s.GetLastSnapshot(It.IsAny(), It.IsAny())) .ReturnsAsync(new List()); var positionHistoryRepository = new Mock(); var accountHistoryRepository = new Mock(); diff --git a/tests/MarginTradingTests/RfqPauseTests.cs b/tests/MarginTradingTests/RfqPauseTests.cs index 4e017e677..b12e6814c 100644 --- a/tests/MarginTradingTests/RfqPauseTests.cs +++ b/tests/MarginTradingTests/RfqPauseTests.cs @@ -106,7 +106,7 @@ public async Task Acknowledge_When_Active_Pause_Exists_Returns_Success() _repositoryPauseMock .Setup(x => x.FindAsync( "active", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, It.IsAny>())) .ReturnsAsync(new[] { GetPause(PauseState.Active) }); @@ -124,7 +124,7 @@ public async Task Acknowledge_When_Pending_Pause_Exists_Updates_It_And_Returns_S _repositoryPauseMock .Setup(x => x.FindAsync( "pending", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.ActivePredicate)) .ReturnsAsync(Enumerable.Empty()); @@ -133,7 +133,7 @@ public async Task Acknowledge_When_Pending_Pause_Exists_Updates_It_And_Returns_S _repositoryPauseMock .Setup(x => x.FindAsync( "pending", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.PendingPredicate)) .ReturnsAsync(new[] { GetPersistedPause(PauseState.Pending) }); @@ -173,7 +173,7 @@ public async Task Acknowledge_When_No_Pause_Exists_Returns_Failure() _repositoryPauseMock .Setup(x => x.FindAsync( It.IsAny(), - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, It.IsAny>())) .ReturnsAsync(Enumerable.Empty()); @@ -191,7 +191,7 @@ public async Task StopPending_When_Pending_Exists_Updates_State_To_Cancelled() _repositoryPauseMock .Setup(x => x.FindAsync( "pending", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.PendingPredicate)) .ReturnsAsync(new[] { GetPersistedPause(PauseState.Pending) }); @@ -217,7 +217,7 @@ public async Task AcknowledgeCancellation_When_PendingCancellation_Exists_Update _repositoryPauseMock .Setup(x => x.FindAsync( "pending cancellation", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.PendingCancellationPredicate)) .ReturnsAsync(new[] { GetPersistedPause(PauseState.PendingCancellation) }); @@ -257,7 +257,7 @@ public async Task AcknowledgeCancellation_When_NoPause_Exists_Returns_Failure() _repositoryPauseMock .Setup(x => x.FindAsync( It.IsAny(), - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, It.IsAny>())) .ReturnsAsync(Enumerable.Empty()); @@ -301,7 +301,7 @@ public async Task Resume_When_ThereIsNo_Active_Pause_Returns_Error() _repositoryPauseMock .Setup(x => x.FindAsync( It.IsAny(), - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.ActivePredicate)) .ReturnsAsync(Enumerable.Empty()); @@ -328,7 +328,7 @@ public async Task Resume_Manually_When_Paused_Not_Manually_Returns_Error() _repositoryPauseMock .Setup(x => x.FindAsync( "active", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.ActivePredicate)) .ReturnsAsync(new[] { GetPersistedPause(PauseState.Active, PauseSource.TradingDisabled) }); @@ -355,7 +355,7 @@ public async Task Resume_When_ActivePauseExists_Updates_It_And_Returns_Success() _repositoryPauseMock .Setup(x => x.FindAsync( "active", - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, RfqPauseService.ActivePredicate)) .ReturnsAsync(new[] { GetPersistedPause(PauseState.Active) }); @@ -514,7 +514,7 @@ private static Pause GetPersistedPause(PauseState? state = null, PauseSource? so private static IOperationExecutionInfo GetExecutionInfoWithState(SpecialLiquidationOperationState? state = null) { var result = new OperationExecutionInfo( - SpecialLiquidationSaga.OperationName, + SpecialLiquidationSaga.Name, "id", DateTime.UtcNow, new SpecialLiquidationOperationData()); diff --git a/tests/MarginTradingTests/WorkflowTests/SpecialLiquidationFailedEventHandlerTests.cs b/tests/MarginTradingTests/WorkflowTests/SpecialLiquidationFailedEventHandlerTests.cs new file mode 100644 index 000000000..14e2d4c83 --- /dev/null +++ b/tests/MarginTradingTests/WorkflowTests/SpecialLiquidationFailedEventHandlerTests.cs @@ -0,0 +1,140 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System; +using System.Threading.Tasks; +using MarginTrading.Backend.Core; +using MarginTrading.Backend.Core.Settings; +using MarginTrading.Backend.Services.Workflow; +using Moq; +using NUnit.Framework; +using ExecutionInfo = MarginTrading.Backend.Core.IOperationExecutionInfo; + +namespace MarginTradingTests.WorkflowTests +{ + [TestFixture] + public class SpecialLiquidationFailedEventHandlerTests + { + [Test] + public async Task DetermineNextAction_ReturnsComplete_WhenInstrumentIsDiscontinued() + { + var result = await SpecialLiquidationFailedEventHandler.DetermineNextAction(null, + true, + _ => false, + false, + _ => Task.FromResult(false), + new SpecialLiquidationSettings()); + + Assert.AreEqual(SpecialLiquidationFailedEventHandler.NextAction.Complete, result); + } + + [Test] + public async Task DetermineNextAction_ReturnsCancel_WhenLiquidityIsEnough() + { + var executionInfo = Mock.Of(x => + x.Data == new SpecialLiquidationOperationData { RequestedFromCorporateActions = false }); + + var result = await SpecialLiquidationFailedEventHandler.DetermineNextAction(executionInfo, + false, + _ => true, + false, + _ => Task.FromResult(false), + new SpecialLiquidationSettings()); + + Assert.AreEqual(SpecialLiquidationFailedEventHandler.NextAction.Cancel, result); + } + + [Test] + public void DetermineNextAction_ChecksLiquidity_IfOnly_NotInitiatedByCorporateActions() + { + var executionInfo = Mock.Of(x => + x.Data == new SpecialLiquidationOperationData { RequestedFromCorporateActions = false }); + + var ex = Assert.ThrowsAsync(async () => + await SpecialLiquidationFailedEventHandler.DetermineNextAction(executionInfo, + false, + _ => throw new InvalidOperationException("Liquidity check expected"), + false, + _ => Task.FromResult(false), + new SpecialLiquidationSettings())); + + Assert.NotNull(ex); + Assert.AreEqual("Liquidity check expected", ex.Message); + } + + [Test] + public async Task DetermineNextAction_ReturnsRetryPriceRequest_WhenRetryIsRequiredAndCanRetryPriceRequest() + { + var executionInfo = Mock.Of(x => + x.Data == new SpecialLiquidationOperationData { RequestedFromCorporateActions = true }); // to skip liquidity check + + var result = await SpecialLiquidationFailedEventHandler.DetermineNextAction(executionInfo, + false, + _ => false, + true, + _ => Task.FromResult(false), + new SpecialLiquidationSettings + { PriceRequestRetryTimeout = TimeSpan.Zero, RetryPriceRequestForCorporateActions = true }); + + Assert.AreEqual(SpecialLiquidationFailedEventHandler.NextAction.RetryPriceRequest, result); + } + + [Test] + public async Task DetermineNextAction_ChecksForPause_BeforeRetryingPriceRequest() + { + var executionInfo = Mock.Of(x => + x.Data == new SpecialLiquidationOperationData { RequestedFromCorporateActions = true }); // to skip liquidity check + + var result = await SpecialLiquidationFailedEventHandler.DetermineNextAction(executionInfo, + false, + _ => false, + true, + _ => Task.FromResult(true), + new SpecialLiquidationSettings + { PriceRequestRetryTimeout = TimeSpan.Zero, RetryPriceRequestForCorporateActions = true }); + + Assert.AreEqual(SpecialLiquidationFailedEventHandler.NextAction.Pause, result); + } + + [Test] + public async Task DetermineNextAction_ResumesInitialFlow_WhenHasCausingLiquidation() + { + var executionInfo = Mock.Of(x => + x.Data == new SpecialLiquidationOperationData + { + CausationOperationId = "123", + RequestedFromCorporateActions = true + }); + + var result = await SpecialLiquidationFailedEventHandler.DetermineNextAction(executionInfo, + false, + _ => false, + false, + _ => Task.FromResult(false), + new SpecialLiquidationSettings + { PriceRequestRetryTimeout = TimeSpan.Zero, RetryPriceRequestForCorporateActions = true }); + + Assert.AreEqual(SpecialLiquidationFailedEventHandler.NextAction.ResumeInitialFlow, result); + } + + [Test] + public async Task DetermineNextAction_ReturnsComplete_WhenAsDefault() + { + var executionInfo = Mock.Of(x => + x.Data == new SpecialLiquidationOperationData + { + CausationOperationId = null, + RequestedFromCorporateActions = false + }); + + var result = await SpecialLiquidationFailedEventHandler.DetermineNextAction(executionInfo, + false, + _ => false, + false, + _ => Task.FromResult(false), + new SpecialLiquidationSettings { PriceRequestRetryTimeout = null, }); + + Assert.AreEqual(SpecialLiquidationFailedEventHandler.NextAction.Complete, result); + } + } +} \ No newline at end of file