Skip to content

Commit

Permalink
Merge pull request #460 from LykkeBusiness/LT-5061-order-does-not-exe…
Browse files Browse the repository at this point in the history
…cute-upon-contribution

fix(LT-5061): let special liquidation fail
  • Loading branch information
tarurar authored Mar 22, 2024
2 parents bd16c57 + 83f2dde commit c251b06
Show file tree
Hide file tree
Showing 16 changed files with 709 additions and 252 deletions.
1 change: 1 addition & 0 deletions MarginTrading.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EFeature_002EServices_002ECodeCleanup_002EFileHeader_002EFileHeaderSettingsMigrate/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=appsettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Bitcoin/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=cqrs/@EntryIndexedValue">True</s:Boolean>
Expand Down
14 changes: 11 additions & 3 deletions src/MarginTrading.Backend.Core/Extensions/SagaExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static bool SwitchState<TState>(this OperationDataBase<TState> data, TSta

if (Convert.ToInt32(data.State) > Convert.ToInt32(expectedState))
{
LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchState),
LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchToState),
$"Operation is already in the next state, so this event is ignored, {new {data, expectedState, nextState}.ToJson()}.");
return false;
}
Expand All @@ -38,6 +38,14 @@ public static bool SwitchState<TState>(this OperationDataBase<TState> data, TSta
return true;
}

public static bool SwitchToState(this OperationDataBase<SpecialLiquidationOperationState> data,
SpecialLiquidationOperationState nextState) =>
data.SwitchState(data.State, nextState);

public static bool SwitchToState(this IOperationExecutionInfo<SpecialLiquidationOperationData> info,
SpecialLiquidationOperationState nextState) =>
info.Data.SwitchToState(nextState);

public static bool SwitchState(this OperationDataBase<SpecialLiquidationOperationState> data,
SpecialLiquidationOperationState expectedState, SpecialLiquidationOperationState nextState)
{
Expand All @@ -55,15 +63,15 @@ public static bool SwitchState(this OperationDataBase<SpecialLiquidationOperatio

if (Convert.ToInt32(data.State) > Convert.ToInt32(expectedState))
{
LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchState),
LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchToState),
$"Operation is already in the next state, so this event is ignored, {new {data, expectedState, nextState}.ToJson()}.");
return false;
}

if (data.State == SpecialLiquidationOperationState.Failed &&
nextState == SpecialLiquidationOperationState.Cancelled)
{
LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchState),
LogLocator.CommonLog.WriteWarning(nameof(SagaExtensions), nameof(SwitchToState),
$"Cannot switch from Failed to Cancelled state (both states are final), so this event is ignored, {new {data, expectedState, nextState}.ToJson()}.");
return false;
}
Expand Down
110 changes: 109 additions & 1 deletion src/MarginTrading.Backend.Services/Helpers/LiquidationHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Common;
using Lykke.Cqrs;
using MarginTrading.Backend.Contracts.Workflow.SpecialLiquidation.Commands;
using MarginTrading.Backend.Core;
using MarginTrading.Backend.Core.Exceptions;
using MarginTrading.Backend.Core.Extensions;
using MarginTrading.Backend.Core.MatchingEngines;
using MarginTrading.Backend.Core.Orders;
using MarginTrading.Backend.Core.Repositories;
using MarginTrading.Backend.Core.Settings;
using MarginTrading.Backend.Core.Trading;
using MarginTrading.Backend.Services.AssetPairs;
using MarginTrading.Backend.Services.Infrastructure;
using MarginTrading.Backend.Services.Services;
using MarginTrading.Backend.Services.Workflow.Liquidation.Commands;
using MarginTrading.Backend.Services.Workflow.SpecialLiquidation.Commands;
using MarginTrading.Common.Services;
using MoreLinq;

Expand All @@ -25,19 +33,34 @@ public class LiquidationHelper
private readonly ICqrsSender _cqrsSender;
private readonly OrdersCache _ordersCache;
private readonly IAssetPairDayOffService _assetPairDayOffService;
private readonly IAssetPairsCache _assetPairsCache;
private readonly CqrsContextNamesSettings _cqrsContextNamesSettings;
private readonly MarginTradingSettings _marginTradingSettings;
private readonly IOperationExecutionInfoRepository _operationExecutionInfoRepository;
private readonly IRfqService _specialLiquidationService;


public LiquidationHelper(IMatchingEngineRouter matchingEngineRouter,
IDateService dateService,
ICqrsSender cqrsSender,
OrdersCache ordersCache,
IAssetPairDayOffService assetPairDayOffService)
IAssetPairDayOffService assetPairDayOffService,
IAssetPairsCache assetPairsCache,
CqrsContextNamesSettings cqrsContextNamesSettings,
IOperationExecutionInfoRepository operationExecutionInfoRepository,
IRfqService specialLiquidationService,
MarginTradingSettings marginTradingSettings)
{
_matchingEngineRouter = matchingEngineRouter;
_dateService = dateService;
_cqrsSender = cqrsSender;
_ordersCache = ordersCache;
_assetPairDayOffService = assetPairDayOffService;
_assetPairsCache = assetPairsCache;
_cqrsContextNamesSettings = cqrsContextNamesSettings;
_operationExecutionInfoRepository = operationExecutionInfoRepository;
_specialLiquidationService = specialLiquidationService;
_marginTradingSettings = marginTradingSettings;
}

public bool CheckIfNetVolumeCanBeLiquidated(string assetPairId, Position[] positions,
Expand Down Expand Up @@ -138,6 +161,91 @@ public bool CheckIfNetVolumeCanBeLiquidated(string assetPairId, Position[] posit

return result;
}

public async Task<bool> FailIfInstrumentDiscontinued(IOperationExecutionInfo<SpecialLiquidationOperationData> executionInfo, ICommandSender sender)
{
var isDiscontinued = _assetPairsCache.GetAssetPairById(executionInfo.Data.Instrument).IsDiscontinued;

if (isDiscontinued)
{
if (executionInfo.Data.SwitchState(SpecialLiquidationOperationState.PriceRequested,
SpecialLiquidationOperationState.OnTheWayToFail))
{
sender.SendCommand(new FailSpecialLiquidationInternalCommand
{
OperationId = executionInfo.Id,
CreationTime = _dateService.Now(),
Reason = "Instrument discontinuation",

}, _cqrsContextNamesSettings.TradingEngine);

await _operationExecutionInfoRepository.Save(executionInfo);
}

return true;
}

return false;
}

public async Task InternalRetryPriceRequest(DateTime eventCreationTime,
ICommandSender sender,
IOperationExecutionInfo<SpecialLiquidationOperationData> executionInfo,
TimeSpan retryTimeout)
{
// fix the intention to make another price request to not let the parallel
// ongoing GetPriceForSpecialLiquidationTimeoutInternalCommand execution
// break (fail) the flow
executionInfo.Data.NextRequestNumber();
await _operationExecutionInfoRepository.Save(executionInfo);

var shouldRetryAfter = eventCreationTime.Add(retryTimeout);

var timeLeftBeforeRetry = shouldRetryAfter - _dateService.Now();

if (timeLeftBeforeRetry > TimeSpan.Zero)
{
await Task.Delay(timeLeftBeforeRetry);
}

RequestPrice(sender, executionInfo);
}

public void RequestPrice(ICommandSender sender, IOperationExecutionInfo<SpecialLiquidationOperationData>
executionInfo)
{
//hack, requested by the bank
var positionsVolume = executionInfo.Data.Volume != 0 ? executionInfo.Data.Volume : 1;

var command = new GetPriceForSpecialLiquidationCommand
{
OperationId = executionInfo.Id,
CreationTime = _dateService.Now(),
Instrument = executionInfo.Data.Instrument,
Volume = positionsVolume,
RequestNumber = executionInfo.Data.RequestNumber,
RequestedFromCorporateActions = executionInfo.Data.RequestedFromCorporateActions
};

if (_marginTradingSettings.ExchangeConnector == ExchangeConnectorType.RealExchangeConnector)
{
//send it to the Gavel
sender.SendCommand(command, _cqrsContextNamesSettings.Gavel);
}
else
{
_specialLiquidationService.SavePriceRequestForSpecialLiquidation(command);
}

//special command is sent instantly for timeout control.. it is retried until timeout occurs
sender.SendCommand(new GetPriceForSpecialLiquidationTimeoutInternalCommand
{
OperationId = executionInfo.Id,
CreationTime = _dateService.Now(),
TimeoutSeconds = _marginTradingSettings.SpecialLiquidation.PriceRequestTimeoutSec,
RequestNumber = executionInfo.Data.RequestNumber
}, _cqrsContextNamesSettings.TradingEngine);
}

public static string GetComment(LiquidationType liquidationType)
{
Expand Down
4 changes: 4 additions & 0 deletions src/MarginTrading.Backend.Services/Modules/CqrsModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ protected override void Load(ContainerBuilder builder)
.SingleInstance();
builder.RegisterInstance(new CqrsContextNamesSettings()).AsSelf().SingleInstance();

builder.RegisterType<SpecialLiquidationFailedEventHandler>()
.AsImplementedInterfaces()
.SingleInstance();

// Sagas & command handlers
builder.RegisterAssemblyTypes(GetType().Assembly).Where(t =>
new[] { "Saga", "CommandsHandler", "Projection" }.Any(ending => t.Name.EndsWith(ending))).AsSelf();
Expand Down
32 changes: 16 additions & 16 deletions src/MarginTrading.Backend.Services/Services/RfqPauseService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task<RfqPauseErrorCode> AddAsync(string operationId, PauseSource so
{
var existingPause = (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
NotCancelledPredicate))
.SingleOrDefault();

Expand All @@ -84,7 +84,7 @@ public async Task<RfqPauseErrorCode> AddAsync(string operationId, PauseSource so
}

var executionInfo = await _executionInfoRepository
.GetAsync<SpecialLiquidationOperationData>(SpecialLiquidationSaga.OperationName, operationId);
.GetAsync<SpecialLiquidationOperationData>(SpecialLiquidationSaga.Name, operationId);

if (executionInfo == null)
return RfqPauseErrorCode.NotFound;
Expand All @@ -99,7 +99,7 @@ await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(AddAsync),

var pause = Pause.Create(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
source,
initiator,
_dateService.Now());
Expand All @@ -123,7 +123,7 @@ public async Task<Pause> GetCurrentAsync(string operationId)

return (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
NotCancelledPredicate))
.SingleOrDefault();
}
Expand All @@ -138,21 +138,21 @@ public async Task<bool> AcknowledgeAsync(string operationId)
{
var activePause = (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
ActivePredicate))
.SingleOrDefault();

if (activePause != null)
{
await _log.WriteInfoAsync(nameof(RfqPauseService), nameof(AcknowledgeAsync), null,
$"The pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}] is effective since [{activePause.EffectiveSince}]");
$"The pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}] is effective since [{activePause.EffectiveSince}]");

return true;
}

var pendingPause = (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
PendingPredicate))
.SingleOrDefault();

Expand All @@ -173,7 +173,7 @@ await _log.WriteInfoAsync(nameof(RfqPauseService), nameof(AcknowledgeAsync), nul
if (!updated)
{
await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(AcknowledgeAsync), null,
$"Couldn't activate pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}]");
$"Couldn't activate pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}]");

return false;
}
Expand Down Expand Up @@ -201,7 +201,7 @@ public async Task StopPendingAsync(string operationId, PauseCancellationSource s
{
var pendingPause = (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
PendingPredicate))
.SingleOrDefault();

Expand All @@ -222,7 +222,7 @@ public async Task StopPendingAsync(string operationId, PauseCancellationSource s
if (!updated)
{
await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(StopPendingAsync), null,
$"Couldn't stop pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}]");
$"Couldn't stop pending pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}]");
}
}

Expand All @@ -244,7 +244,7 @@ public async Task<bool> AcknowledgeCancellationAsync(string operationId)
{
var pendingCancellationPause = (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
PendingCancellationPredicate))
.SingleOrDefault();

Expand All @@ -265,7 +265,7 @@ public async Task<bool> AcknowledgeCancellationAsync(string operationId)
if (!updated)
{
await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(AcknowledgeCancellationAsync), null,
$"Couldn't cancel pending cancellation pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}]");
$"Couldn't cancel pending cancellation pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}]");

return false;
}
Expand Down Expand Up @@ -295,21 +295,21 @@ public async Task<RfqResumeErrorCode> ResumeAsync(string operationId, PauseCance
try
{
var executionInfo = await _executionInfoRepository
.GetAsync<SpecialLiquidationOperationData>(SpecialLiquidationSaga.OperationName, operationId);
.GetAsync<SpecialLiquidationOperationData>(SpecialLiquidationSaga.Name, operationId);

if (executionInfo == null)
return RfqResumeErrorCode.NotFound;

var activePause = (await _pauseRepository.FindAsync(
operationId,
SpecialLiquidationSaga.OperationName,
SpecialLiquidationSaga.Name,
ActivePredicate))
.SingleOrDefault();

if (activePause == null)
{
await _log.WriteInfoAsync(nameof(RfqPauseService), nameof(ResumeAsync), null,
$"The active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}] was not found");
$"The active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}] was not found");

return RfqResumeErrorCode.NotPaused;
}
Expand Down Expand Up @@ -342,7 +342,7 @@ await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(ResumeAsync), null,
else
{
await _log.WriteWarningAsync(nameof(RfqPauseService), nameof(ResumeAsync), null,
$"Couldn't cancel active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.OperationName}] due to database issues");
$"Couldn't cancel active pause for operation id [{operationId}] and name [{SpecialLiquidationSaga.Name}] due to database issues");

return RfqResumeErrorCode.Persistence;
}
Expand Down
18 changes: 18 additions & 0 deletions src/MarginTrading.Backend.Services/Workflow/ISagaEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;
using Lykke.Cqrs;

namespace MarginTrading.Backend.Services.Workflow
{
/// <summary>
/// Base interface for all saga event handlers
/// </summary>
public interface ISagaEventHandler<in TEvent>
{
Task Handle(TEvent @event, ICommandSender sender);

Task<bool> CanHandle(TEvent @event) => Task.FromResult(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

namespace MarginTrading.Backend.Services.Workflow
{
/// <summary>
/// Marker interface for special liquidation saga event handlers
/// </summary>
public interface ISpecialLiquidationSagaEventHandler
{
}
}
Loading

0 comments on commit c251b06

Please sign in to comment.