From 54a5f8dcdbd1a39035272052c30e549550357b96 Mon Sep 17 00:00:00 2001 From: Gennadii Ponomarev Date: Tue, 28 May 2024 13:15:04 +0500 Subject: [PATCH 1/3] fix(LT-5508): orders and positions are fixed during state validation - orders and positions are fixed during state validation - the fixed values are then used to create a draft snapshot - a retry policy added so the "trading data snapshot might be corrupted" fatal errors should be less frequent --- .../Orders/IOrderReader.cs | 18 +++++ .../Repositories/IOrdersHistoryRepository.cs | 2 +- .../IPositionsHistoryRepository.cs | 2 +- .../Snapshots/InMemorySnapshot.cs | 53 +++++++++++++++ .../Snapshots/SnapshotValidationResult.cs | 5 ++ .../Snapshots/ValidationResult.cs | 2 +- .../Caches/OrderCache.cs | 10 --- .../Infrastructure/SnapshotService.cs | 65 +++++++++++-------- .../SnapshotValidationService.cs | 22 +++++-- .../MarginTrading.Backend.Services.csproj | 1 + .../Modules/CacheModule.cs | 1 + .../Policies/SnapshotStateValidationPolicy.cs | 27 ++++++++ .../TradingInstrumentsCacheService.cs | 1 + .../Controllers/AccountsController.cs | 1 + .../Repositories/OrdersHistoryRepository.cs | 14 ++-- .../PositionsHistoryRepository.cs | 14 ++-- .../SnapshotValidationServiceTests.cs | 4 +- .../Modules/MockRepositoriesModule.cs | 4 +- .../ProductChangedProjectionTests.cs | 2 +- 19 files changed, 183 insertions(+), 65 deletions(-) create mode 100644 src/MarginTrading.Backend.Core/Orders/IOrderReader.cs create mode 100644 src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs create mode 100644 src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs diff --git a/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs b/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs new file mode 100644 index 000000000..e7266a2ce --- /dev/null +++ b/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs @@ -0,0 +1,18 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System.Collections.Immutable; +using MarginTrading.Backend.Core.Trading; + +namespace MarginTrading.Backend.Core.Orders +{ + public interface IOrderReader + { + ImmutableArray GetAllOrders(); + ImmutableArray GetPositions(); + ImmutableArray GetPositions(string instrument); + ImmutableArray GetPositionsByFxAssetPairId(string fxAssetPairId); + ImmutableArray GetPending(); + bool TryGetOrderById(string orderId, out Order order); + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs b/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs index 500154f82..93e421689 100644 --- a/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs +++ b/src/MarginTrading.Backend.Core/Repositories/IOrdersHistoryRepository.cs @@ -10,6 +10,6 @@ namespace MarginTrading.Backend.Core.Repositories { public interface IOrdersHistoryRepository { - Task> GetLastSnapshot(DateTime @from); + Task> GetLastSnapshot(DateTime from, DateTime? to = null); } } diff --git a/src/MarginTrading.Backend.Core/Repositories/IPositionsHistoryRepository.cs b/src/MarginTrading.Backend.Core/Repositories/IPositionsHistoryRepository.cs index fe7f10926..91c310daa 100644 --- a/src/MarginTrading.Backend.Core/Repositories/IPositionsHistoryRepository.cs +++ b/src/MarginTrading.Backend.Core/Repositories/IPositionsHistoryRepository.cs @@ -10,6 +10,6 @@ namespace MarginTrading.Backend.Core.Repositories { public interface IPositionsHistoryRepository { - Task> GetLastSnapshot(DateTime @from); + Task> GetLastSnapshot(DateTime from, DateTime? to = null); } } diff --git a/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs b/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs new file mode 100644 index 000000000..1db203ca1 --- /dev/null +++ b/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs @@ -0,0 +1,53 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System.Collections.Immutable; +using System.Linq; +using MarginTrading.Backend.Core.Orders; +using MarginTrading.Backend.Core.Trading; + +namespace MarginTrading.Backend.Core.Snapshots +{ + public class InMemorySnapshot : IOrderReader + { + private readonly ImmutableArray _orders; + private readonly ImmutableArray _positions; + + public InMemorySnapshot(ImmutableArray orders, ImmutableArray positions) + { + _orders = orders; + _positions = positions; + } + + public ImmutableArray GetAllOrders() + { + return _orders; + } + + public ImmutableArray GetPositions() + { + return _positions; + } + + public ImmutableArray GetPositions(string instrument) + { + return _positions.Where(x => x.AssetPairId == instrument).ToImmutableArray(); + } + + public ImmutableArray GetPositionsByFxAssetPairId(string fxAssetPairId) + { + return _positions.Where(x => x.FxAssetPairId == fxAssetPairId).ToImmutableArray(); + } + + public ImmutableArray GetPending() + { + return _orders.Where(x => x.Status == OrderStatus.Active).ToImmutableArray(); + } + + public bool TryGetOrderById(string orderId, out Order order) + { + order = _orders.FirstOrDefault(x => x.Id == orderId); + return order != null; + } + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs b/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs index bb53e7e1d..f6cb490d9 100644 --- a/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs +++ b/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs @@ -1,6 +1,9 @@ // Copyright (c) 2019 Lykke Corp. // See the LICENSE file in the project root for more information. +using MarginTrading.Backend.Core.Orders; +using MarginTrading.Backend.Core.Trading; + namespace MarginTrading.Backend.Core.Snapshots { /// @@ -15,5 +18,7 @@ public class SnapshotValidationResult public ValidationResult Positions { get; set; } public string PreviousSnapshotCorrelationId { get; set; } + + public IOrderReader Cache { get; set; } } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Snapshots/ValidationResult.cs b/src/MarginTrading.Backend.Core/Snapshots/ValidationResult.cs index e5976040c..3c162e325 100644 --- a/src/MarginTrading.Backend.Core/Snapshots/ValidationResult.cs +++ b/src/MarginTrading.Backend.Core/Snapshots/ValidationResult.cs @@ -9,7 +9,7 @@ namespace MarginTrading.Backend.Core.Snapshots /// /// Represent result of trading state validation for entity. /// - /// + /// Short representation of entity public class ValidationResult { /// diff --git a/src/MarginTrading.Backend.Services/Caches/OrderCache.cs b/src/MarginTrading.Backend.Services/Caches/OrderCache.cs index 7b6b8d51c..11ffcde91 100644 --- a/src/MarginTrading.Backend.Services/Caches/OrderCache.cs +++ b/src/MarginTrading.Backend.Services/Caches/OrderCache.cs @@ -11,16 +11,6 @@ namespace MarginTrading.Backend.Services { - public interface IOrderReader - { - ImmutableArray GetAllOrders(); - ImmutableArray GetPositions(); - ImmutableArray GetPositions(string instrument); - ImmutableArray GetPositionsByFxAssetPairId(string fxAssetPairId); - ImmutableArray GetPending(); - bool TryGetOrderById(string orderId, out Order order); - } - public class OrdersCache : IOrderReader { public OrdersCache() diff --git a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs index bc2631d68..f0bf638cc 100644 --- a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs +++ b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs @@ -16,8 +16,10 @@ using MarginTrading.Backend.Core.Snapshots; using MarginTrading.Backend.Services.AssetPairs; using MarginTrading.Backend.Services.Mappers; +using MarginTrading.Backend.Services.Policies; using MarginTrading.Common.Services; using MoreLinq; +using Polly.Retry; namespace MarginTrading.Backend.Services.Infrastructure { @@ -27,7 +29,6 @@ public class SnapshotService : ISnapshotService private readonly IAccountsCacheService _accountsCacheService; private readonly IQuoteCacheService _quoteCacheService; private readonly IFxRateCacheService _fxRateCacheService; - private readonly IOrderReader _orderReader; private readonly IDateService _dateService; private readonly ITradingEngineSnapshotsRepository _tradingEngineSnapshotsRepository; @@ -42,12 +43,13 @@ public class SnapshotService : ISnapshotService private static readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1); public static bool IsMakingSnapshotInProgress => Lock.CurrentCount == 0; + private AsyncRetryPolicy _policy; + public SnapshotService( IScheduleSettingsCacheService scheduleSettingsCacheService, IAccountsCacheService accountsCacheService, IQuoteCacheService quoteCacheService, IFxRateCacheService fxRateCacheService, - IOrderReader orderReader, IDateService dateService, ITradingEngineSnapshotsRepository tradingEngineSnapshotsRepository, ISnapshotValidationService snapshotValidationService, @@ -62,7 +64,6 @@ public SnapshotService( _accountsCacheService = accountsCacheService; _quoteCacheService = quoteCacheService; _fxRateCacheService = fxRateCacheService; - _orderReader = orderReader; _dateService = dateService; _tradingEngineSnapshotsRepository = tradingEngineSnapshotsRepository; _snapshotValidationService = snapshotValidationService; @@ -72,6 +73,8 @@ public SnapshotService( _finalSnapshotCalculator = finalSnapshotCalculator; _snapshotStatusTracker = snapshotStatusTracker; _settings = settings; + + _policy = SnapshotStateValidationPolicy.BuildPolicy(log); } /// @@ -103,38 +106,24 @@ public async Task MakeTradingDataSnapshot(DateTime tradingDay, string co // If one or more queues contain not delivered messages the snapshot can not be created. _queueValidationService.ThrowExceptionIfQueuesNotEmpty(true); - // Before starting snapshot creation the current state should be validated. - var validationResult = await _snapshotValidationService.ValidateCurrentStateAsync(); - - if (!validationResult.IsValid) - { - var ex = new InvalidOperationException( - $"The trading data snapshot might be corrupted. The current state of orders and positions is incorrect. Check the dbo.BlobData table for more info: container {LykkeConstants.MtCoreSnapshotBlobContainer}, correlationId {correlationId}"); - await _log.WriteFatalErrorAsync(nameof(SnapshotService), - nameof(MakeTradingDataSnapshot), - validationResult.ToJson(), - ex); - await _blobRepository.WriteAsync(LykkeConstants.MtCoreSnapshotBlobContainer, correlationId, validationResult); - } - else - { - await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot), - "The current state of orders and positions is correct."); - } - await Lock.WaitAsync(); try { _snapshotStatusTracker.SnapshotInProgress(); - var orders = _orderReader.GetAllOrders(); - var ordersJson = orders.Select(o => o.ConvertToSnapshotContract(_orderReader, status)).ToJson(); + var validationResult = await _policy.ExecuteAsync(() => Validate(correlationId)); + + // orders and positions are fixed at the moment of validation + var orders = validationResult.Cache.GetAllOrders(); + var ordersJson = orders + .Select(o => o.ConvertToSnapshotContract(validationResult.Cache, status)).ToJson(); await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot), $"Preparing data... {orders.Length} orders prepared."); - var positions = _orderReader.GetPositions(); - var positionsJson = positions.Select(p => p.ConvertToSnapshotContract(_orderReader, status)).ToJson(); + var positions = validationResult.Cache.GetPositions(); + var positionsJson = positions + .Select(p => p.ConvertToSnapshotContract(validationResult.Cache, status)).ToJson(); await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot), $"Preparing data... {positions.Length} positions prepared."); @@ -215,6 +204,30 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho } } + private async Task Validate(string correlationId) + { + // Before starting snapshot creation the current state should be validated. + var validationResult = await _snapshotValidationService.ValidateCurrentStateAsync(); + + if (!validationResult.IsValid) + { + var ex = new InvalidOperationException( + $"The trading data snapshot might be corrupted. The current state of orders and positions is incorrect. Check the dbo.BlobData table for more info: container {LykkeConstants.MtCoreSnapshotBlobContainer}, correlationId {correlationId}"); + await _log.WriteFatalErrorAsync(nameof(SnapshotService), + nameof(MakeTradingDataSnapshot), + validationResult.ToJson(), + ex); + await _blobRepository.WriteAsync(LykkeConstants.MtCoreSnapshotBlobContainer, correlationId, validationResult); + } + else + { + await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot), + "The current state of orders and positions is correct."); + } + + return validationResult; + } + /// public async Task MakeTradingDataSnapshotFromDraft( string correlationId, diff --git a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs index cfb4253d5..06749debc 100644 --- a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs +++ b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotValidationService.cs @@ -52,8 +52,15 @@ public async Task ValidateCurrentStateAsync() { await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurrentStateAsync), $"Snapshot validation started: {DateTime.UtcNow}"); - var currentOrders = _orderCache.GetAllOrders(); - var currentPositions = _orderCache.GetPositions(); + var currentOrdersJson = _orderCache.GetAllOrders().ToJson(); + var ordersTimestamp = DateTime.UtcNow; + var currentPositionsJson = _orderCache.GetPositions().ToJson(); + var positionsTimestamp = DateTime.UtcNow; + + // json is deserialized to create a deep copy + // Order and Position objects are already complex, no sense in adding a specialized deep copy / clone method + var currentOrders = currentOrdersJson.DeserializeJson>(); + var currentPositions = currentPositionsJson.DeserializeJson>(); var tradingEngineSnapshot = await _tradingEngineSnapshotsRepository.GetLastAsync(); await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurrentStateAsync), @@ -62,8 +69,8 @@ await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurr var lastOrders = GetOrders(tradingEngineSnapshot); var lastPositions = GetPositions(tradingEngineSnapshot); - var ordersHistory = await _ordersHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp); - var positionsHistory = await _positionsHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp); + var ordersHistory = await _ordersHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp, ordersTimestamp); + var positionsHistory = await _positionsHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp, positionsTimestamp); var restoredOrders = RestoreOrdersCurrentStateFromHistory(lastOrders, ordersHistory); var restoredPositions = RestorePositionsCurrentStateFromHistory(lastPositions, positionsHistory); @@ -97,7 +104,8 @@ await _log.WriteWarningAsync(nameof(SnapshotValidationService), nameof(ValidateC { Orders = ordersValidationResult, Positions = positionsValidationResult, - PreviousSnapshotCorrelationId = tradingEngineSnapshot.CorrelationId + PreviousSnapshotCorrelationId = tradingEngineSnapshot.CorrelationId, + Cache = new InMemorySnapshot(currentOrders, currentPositions), }; } @@ -193,7 +201,7 @@ private static ValidationResult CompareOrders(ImmutableArray c { Extra = extraOrders.ToList(), Missed = missedOrders.ToList(), - Inconsistent = inconsistentOrders.ToList() + Inconsistent = inconsistentOrders.ToList(), }; } @@ -225,7 +233,7 @@ private static ValidationResult ComparePositions(ImmutableArray + diff --git a/src/MarginTrading.Backend.Services/Modules/CacheModule.cs b/src/MarginTrading.Backend.Services/Modules/CacheModule.cs index 482472998..fec4291ef 100644 --- a/src/MarginTrading.Backend.Services/Modules/CacheModule.cs +++ b/src/MarginTrading.Backend.Services/Modules/CacheModule.cs @@ -4,6 +4,7 @@ using Autofac; using MarginTrading.AssetService.Contracts.ClientProfileSettings; using MarginTrading.Backend.Core; +using MarginTrading.Backend.Core.Orders; using MarginTrading.Backend.Core.Services; using MarginTrading.Backend.Services.AssetPairs; using MarginTrading.Backend.Services.Caches; diff --git a/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs b/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs new file mode 100644 index 000000000..64a646642 --- /dev/null +++ b/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs @@ -0,0 +1,27 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System; +using Common.Log; +using MarginTrading.Backend.Services.Infrastructure; +using Microsoft.Extensions.Logging; +using Polly; +using Polly.Retry; + +namespace MarginTrading.Backend.Services.Policies +{ + public static class SnapshotStateValidationPolicy + { + public static AsyncRetryPolicy BuildPolicy(ILog logger) + { + return Policy + .Handle() + .WaitAndRetryAsync(3, + x => TimeSpan.FromSeconds(x * 5), + (exception, span) => logger.WriteWarningAsync( + nameof(SnapshotService), + nameof(SnapshotService.MakeTradingDataSnapshot), + $"Exception: {exception?.Message}")); + } + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/TradingConditions/TradingInstrumentsCacheService.cs b/src/MarginTrading.Backend.Services/TradingConditions/TradingInstrumentsCacheService.cs index b79f2b0dd..70fdd584b 100644 --- a/src/MarginTrading.Backend.Services/TradingConditions/TradingInstrumentsCacheService.cs +++ b/src/MarginTrading.Backend.Services/TradingConditions/TradingInstrumentsCacheService.cs @@ -10,6 +10,7 @@ using MarginTrading.Backend.Contracts.Events; using MarginTrading.Backend.Core; using MarginTrading.Backend.Core.Messages; +using MarginTrading.Backend.Core.Orders; using MarginTrading.Backend.Core.Repositories; using MarginTrading.Backend.Core.Settings; using MarginTrading.Backend.Core.TradingConditions; diff --git a/src/MarginTrading.Backend/Controllers/AccountsController.cs b/src/MarginTrading.Backend/Controllers/AccountsController.cs index b1e69ac6f..d97b65bd8 100644 --- a/src/MarginTrading.Backend/Controllers/AccountsController.cs +++ b/src/MarginTrading.Backend/Controllers/AccountsController.cs @@ -8,6 +8,7 @@ using MarginTrading.Backend.Contracts.Account; using MarginTrading.Backend.Core; using MarginTrading.Backend.Core.Exceptions; +using MarginTrading.Backend.Core.Orders; using MarginTrading.Backend.Core.Services; using MarginTrading.Backend.Exceptions; using MarginTrading.Backend.Services; diff --git a/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs b/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs index 4977b17cd..25a2ccee5 100644 --- a/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs +++ b/src/MarginTrading.SqlRepositories/Repositories/OrdersHistoryRepository.cs @@ -34,7 +34,7 @@ CASE [Status] ELSE 99 END as StatusOrder FROM [{0}] oh (NOLOCK) - WHERE oh.ModifiedTimestamp > @Timestamp + WHERE oh.ModifiedTimestamp > @From and ((@To is NULL) OR (oh.ModifiedTimestamp <= @To)) ), filteredOrderHistWithRowNumber AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY Id ORDER BY @@ -54,14 +54,14 @@ public OrdersHistoryRepository(string connectionString, string tableName, int ge _getLastSnapshotTimeoutS = getLastSnapshotTimeoutS; } - public async Task> GetLastSnapshot(DateTime @from) + public async Task> GetLastSnapshot(DateTime from, DateTime? to = null) { - using (var conn = new SqlConnection(_connectionString)) - { - var data = await conn.QueryAsync(_select, new { Timestamp = @from }, commandTimeout: _getLastSnapshotTimeoutS); + await using var conn = new SqlConnection(_connectionString); + var data = await conn.QueryAsync(_select, + new { From = from, To = to }, + commandTimeout: _getLastSnapshotTimeoutS); - return data.Cast().ToList(); - } + return data.Cast().ToList(); } } } \ No newline at end of file diff --git a/src/MarginTrading.SqlRepositories/Repositories/PositionsHistoryRepository.cs b/src/MarginTrading.SqlRepositories/Repositories/PositionsHistoryRepository.cs index ba5364239..19b759b85 100644 --- a/src/MarginTrading.SqlRepositories/Repositories/PositionsHistoryRepository.cs +++ b/src/MarginTrading.SqlRepositories/Repositories/PositionsHistoryRepository.cs @@ -22,7 +22,7 @@ public class PositionsHistoryRepository : IPositionsHistoryRepository SELECT *, ROW_NUMBER() OVER (PARTITION BY Id ORDER BY HistoryTimestamp DESC) AS rn FROM [{0}] ph - WHERE ph.HistoryTimestamp > @Timestamp + WHERE ph.HistoryTimestamp > @From and ((@To is NULL) OR (ph.HistoryTimestamp <= @To)) ) SELECT * FROM cte @@ -35,14 +35,14 @@ public PositionsHistoryRepository(string connectionString, string tableName, int _select = string.Format(_select, tableName); } - public async Task> GetLastSnapshot(DateTime @from) + public async Task> GetLastSnapshot(DateTime from, DateTime? to = null) { - using (var conn = new SqlConnection(_connectionString)) - { - var data = await conn.QueryAsync(_select, new { Timestamp = @from }, commandTimeout: _getLastSnapshotTimeoutS); + await using var conn = new SqlConnection(_connectionString); + var data = await conn.QueryAsync(_select, + new { From = from, To = to }, + commandTimeout: _getLastSnapshotTimeoutS); - return data.Cast().ToList(); - } + return data.Cast().ToList(); } } } \ No newline at end of file diff --git a/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs b/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs index 33dccad2e..8e2b608b8 100644 --- a/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs +++ b/tests/MarginTradingTests/Infrastructure/SnapshotValidationServiceTests.cs @@ -62,10 +62,10 @@ public void SetUp() _orderCacheMock.Setup(o => o.GetPositions()) .Returns(() => _currentPositions.ToImmutableArray()); - _ordersHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny())) + _ordersHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny(), null)) .ReturnsAsync((DateTime date) => _ordersHistory); - _positionsHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny())) + _positionsHistoryRepositoryMock.Setup(o => o.GetLastSnapshot(It.IsAny(), null)) .ReturnsAsync((DateTime date) => _positionsHistory); _service = new SnapshotValidationService( diff --git a/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs b/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs index 9cfd403a4..5a626769f 100644 --- a/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs +++ b/tests/MarginTradingTests/Modules/MockRepositoriesModule.cs @@ -29,11 +29,11 @@ protected override void Load(ContainerBuilder builder) blobRepository.Setup(s => s.ReadWithTimestampAsync>(It.IsAny(), It.IsAny())) .ReturnsAsync((new List(), DateTime.UtcNow)); var orderHistoryRepository = new Mock(); - orderHistoryRepository.Setup(s => s.GetLastSnapshot(It.IsAny())) + orderHistoryRepository.Setup(s => s.GetLastSnapshot(It.IsAny(), null)) .ReturnsAsync(new List()); var positionHistoryRepository = new Mock(); var accountHistoryRepository = new Mock(); - positionHistoryRepository.Setup(s => s.GetLastSnapshot(It.IsAny())) + positionHistoryRepository.Setup(s => s.GetLastSnapshot(It.IsAny(), null)) .ReturnsAsync(new List()); accountHistoryRepository.Setup(s => s.GetSwapTotalPerPosition(It.IsAny>())) .ReturnsAsync(new Dictionary()); diff --git a/tests/MarginTradingTests/ProductChangedProjectionTests.cs b/tests/MarginTradingTests/ProductChangedProjectionTests.cs index 11a112f16..8e12cbe53 100644 --- a/tests/MarginTradingTests/ProductChangedProjectionTests.cs +++ b/tests/MarginTradingTests/ProductChangedProjectionTests.cs @@ -9,9 +9,9 @@ using MarginTrading.AssetService.Contracts.Enums; using MarginTrading.AssetService.Contracts.Products; using MarginTrading.Backend.Core; +using MarginTrading.Backend.Core.Orders; using MarginTrading.Backend.Core.Rfq; using MarginTrading.Backend.Core.Settings; -using MarginTrading.Backend.Services; using MarginTrading.Backend.Services.AssetPairs; using MarginTrading.Backend.Services.Services; using MarginTrading.Backend.Services.TradingConditions; From 43e5168773f34ff1e522c6b7cd40326f9bf86449 Mon Sep 17 00:00:00 2001 From: Gennadii Ponomarev Date: Fri, 22 Nov 2024 10:48:17 +0500 Subject: [PATCH 2/3] fix(LT-5508): extract IOrderReaderBase --- .../Orders/IOrderReader.cs | 5 +---- .../Orders/IOrderReaderBase.cs | 15 +++++++++++++++ .../Snapshots/InMemorySnapshot.cs | 17 +---------------- .../Snapshots/SnapshotValidationResult.cs | 2 +- .../Mappers/DomainToContractMappers.cs | 8 ++++---- 5 files changed, 22 insertions(+), 25 deletions(-) create mode 100644 src/MarginTrading.Backend.Core/Orders/IOrderReaderBase.cs diff --git a/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs b/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs index e7266a2ce..7c22f7cb7 100644 --- a/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs +++ b/src/MarginTrading.Backend.Core/Orders/IOrderReader.cs @@ -6,13 +6,10 @@ namespace MarginTrading.Backend.Core.Orders { - public interface IOrderReader + public interface IOrderReader : IOrderReaderBase { - ImmutableArray GetAllOrders(); - ImmutableArray GetPositions(); ImmutableArray GetPositions(string instrument); ImmutableArray GetPositionsByFxAssetPairId(string fxAssetPairId); ImmutableArray GetPending(); - bool TryGetOrderById(string orderId, out Order order); } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Orders/IOrderReaderBase.cs b/src/MarginTrading.Backend.Core/Orders/IOrderReaderBase.cs new file mode 100644 index 000000000..3f840a0bd --- /dev/null +++ b/src/MarginTrading.Backend.Core/Orders/IOrderReaderBase.cs @@ -0,0 +1,15 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System.Collections.Immutable; +using MarginTrading.Backend.Core.Trading; + +namespace MarginTrading.Backend.Core.Orders +{ + public interface IOrderReaderBase + { + ImmutableArray GetAllOrders(); + ImmutableArray GetPositions(); + bool TryGetOrderById(string orderId, out Order order); + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs b/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs index 1db203ca1..33a50bc1c 100644 --- a/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs +++ b/src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs @@ -8,7 +8,7 @@ namespace MarginTrading.Backend.Core.Snapshots { - public class InMemorySnapshot : IOrderReader + public class InMemorySnapshot : IOrderReaderBase { private readonly ImmutableArray _orders; private readonly ImmutableArray _positions; @@ -29,21 +29,6 @@ public ImmutableArray GetPositions() return _positions; } - public ImmutableArray GetPositions(string instrument) - { - return _positions.Where(x => x.AssetPairId == instrument).ToImmutableArray(); - } - - public ImmutableArray GetPositionsByFxAssetPairId(string fxAssetPairId) - { - return _positions.Where(x => x.FxAssetPairId == fxAssetPairId).ToImmutableArray(); - } - - public ImmutableArray GetPending() - { - return _orders.Where(x => x.Status == OrderStatus.Active).ToImmutableArray(); - } - public bool TryGetOrderById(string orderId, out Order order) { order = _orders.FirstOrDefault(x => x.Id == orderId); diff --git a/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs b/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs index f6cb490d9..57a4943c8 100644 --- a/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs +++ b/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs @@ -19,6 +19,6 @@ public class SnapshotValidationResult public string PreviousSnapshotCorrelationId { get; set; } - public IOrderReader Cache { get; set; } + public IOrderReaderBase Cache { get; set; } } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Mappers/DomainToContractMappers.cs b/src/MarginTrading.Backend.Services/Mappers/DomainToContractMappers.cs index b004d45f1..a32309af6 100644 --- a/src/MarginTrading.Backend.Services/Mappers/DomainToContractMappers.cs +++ b/src/MarginTrading.Backend.Services/Mappers/DomainToContractMappers.cs @@ -22,7 +22,7 @@ namespace MarginTrading.Backend.Services.Mappers { public static class DomainToContractMappers { - public static OrderContract ConvertToContract(this Order order, IOrderReader orderReader) + public static OrderContract ConvertToContract(this Order order, IOrderReaderBase orderReader) { var relatedOrders = new List(); @@ -44,7 +44,7 @@ public static OrderContract ConvertToContract(this Order order, IOrderReader ord /// Snapshot status /// /// - public static object ConvertToSnapshotContract(this Order order, IOrderReader orderReader, SnapshotStatus status = SnapshotStatus.Final) + public static object ConvertToSnapshotContract(this Order order, IOrderReaderBase orderReader, SnapshotStatus status = SnapshotStatus.Final) { return status == SnapshotStatus.Draft ? (object) order @@ -132,7 +132,7 @@ RelatedOrderInfoContract Map(RelatedOrderInfo relatedOrderInfo) }; } - public static OpenPositionContract ConvertToContract(this Position position, IOrderReader orderReader) + public static OpenPositionContract ConvertToContract(this Position position, IOrderReaderBase orderReader) { var relatedOrders = new List(); @@ -193,7 +193,7 @@ public static OpenPositionContract ConvertToContract(this Position position, IOr /// Snapshot status /// /// - public static object ConvertToSnapshotContract(this Position position, IOrderReader orderReader, SnapshotStatus status = SnapshotStatus.Final) + public static object ConvertToSnapshotContract(this Position position, IOrderReaderBase orderReader, SnapshotStatus status = SnapshotStatus.Final) { return status == SnapshotStatus.Draft ? (object) position From 953bed8af36912f4f24ccab13f4b24af3681d960 Mon Sep 17 00:00:00 2001 From: Gennadii Ponomarev Date: Fri, 22 Nov 2024 11:47:31 +0500 Subject: [PATCH 3/3] fix(LT-5508): improve policy & error handling --- .../Exceptions/SnapshotValidationError.cs | 12 ++++ .../Exceptions/SnapshotValidationException.cs | 22 ++++++++ .../Snapshots/SnapshotValidationResult.cs | 8 ++- .../Infrastructure/SnapshotService.cs | 56 +++++++++++++------ .../Policies/SnapshotStateValidationPolicy.cs | 18 +++--- 5 files changed, 90 insertions(+), 26 deletions(-) create mode 100644 src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationError.cs create mode 100644 src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationException.cs diff --git a/src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationError.cs b/src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationError.cs new file mode 100644 index 000000000..d51c00d95 --- /dev/null +++ b/src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationError.cs @@ -0,0 +1,12 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +namespace MarginTrading.Backend.Core.Exceptions +{ + public enum SnapshotValidationError + { + None = 0, + Unknown = 1, + InvalidOrderOrPositionState = 2, + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationException.cs b/src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationException.cs new file mode 100644 index 000000000..3a8c8c7d0 --- /dev/null +++ b/src/MarginTrading.Backend.Core/Exceptions/SnapshotValidationException.cs @@ -0,0 +1,22 @@ +// Copyright (c) 2019 Lykke Corp. +// See the LICENSE file in the project root for more information. + +using System; + +namespace MarginTrading.Backend.Core.Exceptions +{ + public class SnapshotValidationException : ValidationException + { + public SnapshotValidationException(SnapshotValidationError errorCode) : base(errorCode) + { + } + + public SnapshotValidationException(string message, SnapshotValidationError errorCode) : base(message, errorCode) + { + } + + public SnapshotValidationException(string message, SnapshotValidationError errorCode, Exception innerException) : base(message, errorCode, innerException) + { + } + } +} \ No newline at end of file diff --git a/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs b/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs index 57a4943c8..6af1ffaa9 100644 --- a/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs +++ b/src/MarginTrading.Backend.Core/Snapshots/SnapshotValidationResult.cs @@ -1,8 +1,8 @@ // Copyright (c) 2019 Lykke Corp. // See the LICENSE file in the project root for more information. +using MarginTrading.Backend.Core.Exceptions; using MarginTrading.Backend.Core.Orders; -using MarginTrading.Backend.Core.Trading; namespace MarginTrading.Backend.Core.Snapshots { @@ -11,7 +11,9 @@ namespace MarginTrading.Backend.Core.Snapshots /// public class SnapshotValidationResult { - public bool IsValid => Orders.IsValid && Positions.IsValid; + public bool IsValid => Orders is { IsValid: true } + && Positions is { IsValid: true } + && Exception == null; public ValidationResult Orders { get; set; } @@ -20,5 +22,7 @@ public class SnapshotValidationResult public string PreviousSnapshotCorrelationId { get; set; } public IOrderReaderBase Cache { get; set; } + + public SnapshotValidationException Exception { get; set; } } } \ No newline at end of file diff --git a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs index f0bf638cc..e6283a57b 100644 --- a/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs +++ b/src/MarginTrading.Backend.Services/Infrastructure/SnapshotService.cs @@ -10,6 +10,7 @@ using Common.Log; using MarginTrading.Backend.Contracts.Prices; using MarginTrading.Backend.Core; +using MarginTrading.Backend.Core.Exceptions; using MarginTrading.Backend.Core.Repositories; using MarginTrading.Backend.Core.Services; using MarginTrading.Backend.Core.Settings; @@ -19,6 +20,7 @@ using MarginTrading.Backend.Services.Policies; using MarginTrading.Common.Services; using MoreLinq; +using Polly; using Polly.Retry; namespace MarginTrading.Backend.Services.Infrastructure @@ -43,7 +45,7 @@ public class SnapshotService : ISnapshotService private static readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1); public static bool IsMakingSnapshotInProgress => Lock.CurrentCount == 0; - private AsyncRetryPolicy _policy; + private AsyncRetryPolicy _policy; public SnapshotService( IScheduleSettingsCacheService scheduleSettingsCacheService, @@ -113,6 +115,14 @@ public async Task MakeTradingDataSnapshot(DateTime tradingDay, string co _snapshotStatusTracker.SnapshotInProgress(); var validationResult = await _policy.ExecuteAsync(() => Validate(correlationId)); + if (!validationResult.IsValid) + { + await _log.WriteFatalErrorAsync(nameof(SnapshotService), + nameof(MakeTradingDataSnapshot), + validationResult.ToJson(), + validationResult.Exception); + throw validationResult.Exception; + } // orders and positions are fixed at the moment of validation var orders = validationResult.Cache.GetAllOrders(); @@ -206,26 +216,38 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho private async Task Validate(string correlationId) { - // Before starting snapshot creation the current state should be validated. - var validationResult = await _snapshotValidationService.ValidateCurrentStateAsync(); - - if (!validationResult.IsValid) + try { - var ex = new InvalidOperationException( - $"The trading data snapshot might be corrupted. The current state of orders and positions is incorrect. Check the dbo.BlobData table for more info: container {LykkeConstants.MtCoreSnapshotBlobContainer}, correlationId {correlationId}"); - await _log.WriteFatalErrorAsync(nameof(SnapshotService), - nameof(MakeTradingDataSnapshot), - validationResult.ToJson(), - ex); - await _blobRepository.WriteAsync(LykkeConstants.MtCoreSnapshotBlobContainer, correlationId, validationResult); + // Before starting snapshot creation the current state should be validated. + var validationResult = await _snapshotValidationService.ValidateCurrentStateAsync(); + + if (!validationResult.IsValid) + { + var errorMessage = + "The trading data snapshot might be corrupted. The current state of orders and positions is incorrect. Check the dbo.BlobData table for more info: container {LykkeConstants.MtCoreSnapshotBlobContainer}, correlationId {correlationId}"; + var ex = new SnapshotValidationException(errorMessage, + SnapshotValidationError.InvalidOrderOrPositionState); + validationResult.Exception = ex; + await _blobRepository.WriteAsync(LykkeConstants.MtCoreSnapshotBlobContainer, correlationId, validationResult); + } + else + { + await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot), + "The current state of orders and positions is correct."); + } + + return validationResult; } - else + catch (Exception e) { - await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot), - "The current state of orders and positions is correct."); - } + // in case validation fails for some reason (not related to orders / positions inconsistency, e.g. a network error during validation) + var result = new SnapshotValidationResult + { + Exception = new SnapshotValidationException("Snapshot validation failed", SnapshotValidationError.Unknown, e), + }; - return validationResult; + return result; + } } /// diff --git a/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs b/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs index 64a646642..f91cf3241 100644 --- a/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs +++ b/src/MarginTrading.Backend.Services/Policies/SnapshotStateValidationPolicy.cs @@ -2,9 +2,10 @@ // See the LICENSE file in the project root for more information. using System; +using System.Threading.Tasks; using Common.Log; +using MarginTrading.Backend.Core.Snapshots; using MarginTrading.Backend.Services.Infrastructure; -using Microsoft.Extensions.Logging; using Polly; using Polly.Retry; @@ -12,16 +13,19 @@ namespace MarginTrading.Backend.Services.Policies { public static class SnapshotStateValidationPolicy { - public static AsyncRetryPolicy BuildPolicy(ILog logger) + public static AsyncRetryPolicy BuildPolicy(ILog logger) { return Policy - .Handle() + .HandleResult(x => !x.IsValid) .WaitAndRetryAsync(3, x => TimeSpan.FromSeconds(x * 5), - (exception, span) => logger.WriteWarningAsync( - nameof(SnapshotService), - nameof(SnapshotService.MakeTradingDataSnapshot), - $"Exception: {exception?.Message}")); + (result, span) => logger?.WriteWarningAsync( + nameof(SnapshotService), + nameof(SnapshotService.MakeTradingDataSnapshot), + $"Exception: {result?.Result?.Exception}", + result?.Result?.Exception) + // in case logger is not provided + ?? Task.CompletedTask); } } } \ No newline at end of file