Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(LT-5508): orders and positions are fixed during state validation #526

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

namespace MarginTrading.Backend.Core.Exceptions
{
public enum SnapshotValidationError
{
None = 0,
Unknown = 1,
InvalidOrderOrPositionState = 2,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;

namespace MarginTrading.Backend.Core.Exceptions
{
public class SnapshotValidationException : ValidationException<SnapshotValidationError>
{
public SnapshotValidationException(SnapshotValidationError errorCode) : base(errorCode)
{
}

public SnapshotValidationException(string message, SnapshotValidationError errorCode) : base(message, errorCode)
{
}

public SnapshotValidationException(string message, SnapshotValidationError errorCode, Exception innerException) : base(message, errorCode, innerException)
{
}
}
}
15 changes: 15 additions & 0 deletions src/MarginTrading.Backend.Core/Orders/IOrderReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System.Collections.Immutable;
using MarginTrading.Backend.Core.Trading;

namespace MarginTrading.Backend.Core.Orders
{
public interface IOrderReader : IOrderReaderBase
{
ImmutableArray<Position> GetPositions(string instrument);
ImmutableArray<Position> GetPositionsByFxAssetPairId(string fxAssetPairId);
ImmutableArray<Order> GetPending();
}
}
15 changes: 15 additions & 0 deletions src/MarginTrading.Backend.Core/Orders/IOrderReaderBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System.Collections.Immutable;
using MarginTrading.Backend.Core.Trading;

namespace MarginTrading.Backend.Core.Orders
{
public interface IOrderReaderBase
{
ImmutableArray<Order> GetAllOrders();
ImmutableArray<Position> GetPositions();
bool TryGetOrderById(string orderId, out Order order);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ namespace MarginTrading.Backend.Core.Repositories
{
public interface IOrdersHistoryRepository
{
Task<IReadOnlyList<IOrderHistory>> GetLastSnapshot(DateTime @from);
Task<IReadOnlyList<IOrderHistory>> GetLastSnapshot(DateTime from, DateTime? to = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ namespace MarginTrading.Backend.Core.Repositories
{
public interface IPositionsHistoryRepository
{
Task<IReadOnlyList<IPositionHistory>> GetLastSnapshot(DateTime @from);
Task<IReadOnlyList<IPositionHistory>> GetLastSnapshot(DateTime from, DateTime? to = null);
}
}
38 changes: 38 additions & 0 deletions src/MarginTrading.Backend.Core/Snapshots/InMemorySnapshot.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System.Collections.Immutable;
using System.Linq;
using MarginTrading.Backend.Core.Orders;
using MarginTrading.Backend.Core.Trading;

namespace MarginTrading.Backend.Core.Snapshots
{
public class InMemorySnapshot : IOrderReaderBase
{
private readonly ImmutableArray<Order> _orders;
private readonly ImmutableArray<Position> _positions;

public InMemorySnapshot(ImmutableArray<Order> orders, ImmutableArray<Position> positions)
{
_orders = orders;
_positions = positions;
}

public ImmutableArray<Order> GetAllOrders()
{
return _orders;
}

public ImmutableArray<Position> GetPositions()
{
return _positions;
}

public bool TryGetOrderById(string orderId, out Order order)
{
order = _orders.FirstOrDefault(x => x.Id == orderId);
return order != null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using MarginTrading.Backend.Core.Exceptions;
using MarginTrading.Backend.Core.Orders;

namespace MarginTrading.Backend.Core.Snapshots
{
/// <summary>
/// Represents result of trading state validation.
/// </summary>
public class SnapshotValidationResult
{
public bool IsValid => Orders.IsValid && Positions.IsValid;
public bool IsValid => Orders is { IsValid: true }
&& Positions is { IsValid: true }
&& Exception == null;

public ValidationResult<OrderInfo> Orders { get; set; }

public ValidationResult<PositionInfo> Positions { get; set; }

public string PreviousSnapshotCorrelationId { get; set; }

public IOrderReaderBase Cache { get; set; }

public SnapshotValidationException Exception { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace MarginTrading.Backend.Core.Snapshots
/// <summary>
/// Represent result of trading state validation for <see cref="T"/> entity.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="T">Short representation of entity</typeparam>
public class ValidationResult<T>
{
/// <summary>
Expand Down
10 changes: 0 additions & 10 deletions src/MarginTrading.Backend.Services/Caches/OrderCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@

namespace MarginTrading.Backend.Services
{
public interface IOrderReader
{
ImmutableArray<Order> GetAllOrders();
ImmutableArray<Position> GetPositions();
ImmutableArray<Position> GetPositions(string instrument);
ImmutableArray<Position> GetPositionsByFxAssetPairId(string fxAssetPairId);
ImmutableArray<Order> GetPending();
bool TryGetOrderById(string orderId, out Order order);
}

public class OrdersCache : IOrderReader
{
public OrdersCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
using Common.Log;
using MarginTrading.Backend.Contracts.Prices;
using MarginTrading.Backend.Core;
using MarginTrading.Backend.Core.Exceptions;
using MarginTrading.Backend.Core.Repositories;
using MarginTrading.Backend.Core.Services;
using MarginTrading.Backend.Core.Settings;
using MarginTrading.Backend.Core.Snapshots;
using MarginTrading.Backend.Services.AssetPairs;
using MarginTrading.Backend.Services.Mappers;
using MarginTrading.Backend.Services.Policies;
using MarginTrading.Common.Services;
using MoreLinq;
using Polly;
using Polly.Retry;

namespace MarginTrading.Backend.Services.Infrastructure
{
Expand All @@ -27,7 +31,6 @@ public class SnapshotService : ISnapshotService
private readonly IAccountsCacheService _accountsCacheService;
private readonly IQuoteCacheService _quoteCacheService;
private readonly IFxRateCacheService _fxRateCacheService;
private readonly IOrderReader _orderReader;
private readonly IDateService _dateService;

private readonly ITradingEngineSnapshotsRepository _tradingEngineSnapshotsRepository;
Expand All @@ -42,12 +45,13 @@ public class SnapshotService : ISnapshotService
private static readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1);
public static bool IsMakingSnapshotInProgress => Lock.CurrentCount == 0;

private AsyncRetryPolicy<SnapshotValidationResult> _policy;

public SnapshotService(
IScheduleSettingsCacheService scheduleSettingsCacheService,
IAccountsCacheService accountsCacheService,
IQuoteCacheService quoteCacheService,
IFxRateCacheService fxRateCacheService,
IOrderReader orderReader,
IDateService dateService,
ITradingEngineSnapshotsRepository tradingEngineSnapshotsRepository,
ISnapshotValidationService snapshotValidationService,
Expand All @@ -62,7 +66,6 @@ public SnapshotService(
_accountsCacheService = accountsCacheService;
_quoteCacheService = quoteCacheService;
_fxRateCacheService = fxRateCacheService;
_orderReader = orderReader;
_dateService = dateService;
_tradingEngineSnapshotsRepository = tradingEngineSnapshotsRepository;
_snapshotValidationService = snapshotValidationService;
Expand All @@ -72,6 +75,8 @@ public SnapshotService(
_finalSnapshotCalculator = finalSnapshotCalculator;
_snapshotStatusTracker = snapshotStatusTracker;
_settings = settings;

_policy = SnapshotStateValidationPolicy.BuildPolicy(log);
}

/// <inheritdoc />
Expand Down Expand Up @@ -103,38 +108,32 @@ public async Task<string> MakeTradingDataSnapshot(DateTime tradingDay, string co
// If one or more queues contain not delivered messages the snapshot can not be created.
_queueValidationService.ThrowExceptionIfQueuesNotEmpty(true);

// Before starting snapshot creation the current state should be validated.
var validationResult = await _snapshotValidationService.ValidateCurrentStateAsync();

if (!validationResult.IsValid)
{
var ex = new InvalidOperationException(
$"The trading data snapshot might be corrupted. The current state of orders and positions is incorrect. Check the dbo.BlobData table for more info: container {LykkeConstants.MtCoreSnapshotBlobContainer}, correlationId {correlationId}");
await _log.WriteFatalErrorAsync(nameof(SnapshotService),
nameof(MakeTradingDataSnapshot),
validationResult.ToJson(),
ex);
await _blobRepository.WriteAsync(LykkeConstants.MtCoreSnapshotBlobContainer, correlationId, validationResult);
}
else
{
await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
"The current state of orders and positions is correct.");
}

await Lock.WaitAsync();

try
{
_snapshotStatusTracker.SnapshotInProgress();

var orders = _orderReader.GetAllOrders();
var ordersJson = orders.Select(o => o.ConvertToSnapshotContract(_orderReader, status)).ToJson();
var validationResult = await _policy.ExecuteAsync(() => Validate(correlationId));
if (!validationResult.IsValid)
{
await _log.WriteFatalErrorAsync(nameof(SnapshotService),
nameof(MakeTradingDataSnapshot),
validationResult.ToJson(),
validationResult.Exception);
throw validationResult.Exception;
}

// orders and positions are fixed at the moment of validation
var orders = validationResult.Cache.GetAllOrders();
var ordersJson = orders
.Select(o => o.ConvertToSnapshotContract(validationResult.Cache, status)).ToJson();
await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
$"Preparing data... {orders.Length} orders prepared.");

var positions = _orderReader.GetPositions();
var positionsJson = positions.Select(p => p.ConvertToSnapshotContract(_orderReader, status)).ToJson();
var positions = validationResult.Cache.GetPositions();
var positionsJson = positions
.Select(p => p.ConvertToSnapshotContract(validationResult.Cache, status)).ToJson();
await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
$"Preparing data... {positions.Length} positions prepared.");

Expand Down Expand Up @@ -215,6 +214,42 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho
}
}

private async Task<SnapshotValidationResult> Validate(string correlationId)
{
try
{
// Before starting snapshot creation the current state should be validated.
var validationResult = await _snapshotValidationService.ValidateCurrentStateAsync();

if (!validationResult.IsValid)
{
var errorMessage =
"The trading data snapshot might be corrupted. The current state of orders and positions is incorrect. Check the dbo.BlobData table for more info: container {LykkeConstants.MtCoreSnapshotBlobContainer}, correlationId {correlationId}";
var ex = new SnapshotValidationException(errorMessage,
SnapshotValidationError.InvalidOrderOrPositionState);
validationResult.Exception = ex;
await _blobRepository.WriteAsync(LykkeConstants.MtCoreSnapshotBlobContainer, correlationId, validationResult);
}
else
{
await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
"The current state of orders and positions is correct.");
}

return validationResult;
}
catch (Exception e)
{
// in case validation fails for some reason (not related to orders / positions inconsistency, e.g. a network error during validation)
var result = new SnapshotValidationResult
{
Exception = new SnapshotValidationException("Snapshot validation failed", SnapshotValidationError.Unknown, e),
};

return result;
}
}

/// <inheritdoc />
public async Task MakeTradingDataSnapshotFromDraft(
string correlationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,15 @@ public async Task<SnapshotValidationResult> ValidateCurrentStateAsync()
{
await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurrentStateAsync),
$"Snapshot validation started: {DateTime.UtcNow}");
var currentOrders = _orderCache.GetAllOrders();
var currentPositions = _orderCache.GetPositions();
var currentOrdersJson = _orderCache.GetAllOrders().ToJson();
var ordersTimestamp = DateTime.UtcNow;
var currentPositionsJson = _orderCache.GetPositions().ToJson();
var positionsTimestamp = DateTime.UtcNow;

// json is deserialized to create a deep copy
// Order and Position objects are already complex, no sense in adding a specialized deep copy / clone method
var currentOrders = currentOrdersJson.DeserializeJson<ImmutableArray<Order>>();
var currentPositions = currentPositionsJson.DeserializeJson<ImmutableArray<Position>>();

var tradingEngineSnapshot = await _tradingEngineSnapshotsRepository.GetLastAsync();
await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurrentStateAsync),
Expand All @@ -62,8 +69,8 @@ await _log.WriteInfoAsync(nameof(SnapshotValidationService), nameof(ValidateCurr
var lastOrders = GetOrders(tradingEngineSnapshot);
var lastPositions = GetPositions(tradingEngineSnapshot);

var ordersHistory = await _ordersHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp);
var positionsHistory = await _positionsHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp);
var ordersHistory = await _ordersHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp, ordersTimestamp);
var positionsHistory = await _positionsHistoryRepository.GetLastSnapshot(tradingEngineSnapshot.Timestamp, positionsTimestamp);

var restoredOrders = RestoreOrdersCurrentStateFromHistory(lastOrders, ordersHistory);
var restoredPositions = RestorePositionsCurrentStateFromHistory(lastPositions, positionsHistory);
Expand Down Expand Up @@ -97,7 +104,8 @@ await _log.WriteWarningAsync(nameof(SnapshotValidationService), nameof(ValidateC
{
Orders = ordersValidationResult,
Positions = positionsValidationResult,
PreviousSnapshotCorrelationId = tradingEngineSnapshot.CorrelationId
PreviousSnapshotCorrelationId = tradingEngineSnapshot.CorrelationId,
Cache = new InMemorySnapshot(currentOrders, currentPositions),
};
}

Expand Down Expand Up @@ -193,7 +201,7 @@ private static ValidationResult<OrderInfo> CompareOrders(ImmutableArray<Order> c
{
Extra = extraOrders.ToList(),
Missed = missedOrders.ToList(),
Inconsistent = inconsistentOrders.ToList()
Inconsistent = inconsistentOrders.ToList(),
};
}

Expand Down Expand Up @@ -225,7 +233,7 @@ private static ValidationResult<PositionInfo> ComparePositions(ImmutableArray<Po
{
Extra = extraPositions.ToList(),
Missed = missedPositions.ToList(),
Inconsistent = inconsistentPositions.ToList()
Inconsistent = inconsistentPositions.ToList(),
};
}

Expand Down
Loading