Skip to content

Commit

Permalink
Merge pull request #516 from LykkeBusiness/LT-5012-eod-borders
Browse files Browse the repository at this point in the history
Lt 5012 eod borders
  • Loading branch information
gponomarev-lykke authored Mar 26, 2024
2 parents b92a31b + 64b9b41 commit 73d3cee
Show file tree
Hide file tree
Showing 25 changed files with 432 additions and 52 deletions.
27 changes: 1 addition & 26 deletions src/MarginTrading.Backend.Core/Services/ISnapshotService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,5 @@

namespace MarginTrading.Backend.Core.Services
{
public interface ISnapshotService
{
/// <summary>
/// Make final trading snapshot from current system state
/// </summary>
/// <param name="tradingDay"></param>
/// <param name="correlationId"></param>
/// <param name="status"></param>
/// <returns></returns>
Task<string> MakeTradingDataSnapshot(
DateTime tradingDay,
string correlationId,
SnapshotStatus status = SnapshotStatus.Final);

/// <summary>
/// Make final trading snapshot from draft
/// </summary>
/// <param name="correlationId"></param>
/// <param name="cfdQuotes"></param>
/// <param name="fxRates"></param>
/// <returns></returns>
Task MakeTradingDataSnapshotFromDraft(
string correlationId,
IEnumerable<ClosingAssetPrice> cfdQuotes,
IEnumerable<ClosingFxRate> fxRates);
}

}
20 changes: 20 additions & 0 deletions src/MarginTrading.Backend.Core/Services/ISnapshotStatusTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;

namespace MarginTrading.Backend.Core.Services
{
/// <summary>
/// This status tracker decouples draft snapshot creation from rabbit mq
/// Normal flow: <see cref="MarginTrading.Backend.Contracts.TradingSchedule.MarketStateChangedEvent" /> generated => handled in PlatformClosureProjection => snapshot saved
/// Degraded flow: <see cref="MarginTrading.Backend.Contracts.TradingSchedule.MarketStateChangedEvent" /> generated, snapshot requested => event not received in PlatformClosureProjection => SnapshotMonitoringService retries snapshot creation after a timeout
/// </summary>
public interface ISnapshotStatusTracker
{
void SnapshotRequested(DateTime tradingDay);
void SnapshotInProgress();
void SnapshotCreated();
bool ShouldRetrySnapshot(out DateTime tradingDay);
}
}
18 changes: 18 additions & 0 deletions src/MarginTrading.Backend.Core/Settings/BookKeeperServiceClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using JetBrains.Annotations;
using Lykke.SettingsReader.Attributes;

namespace MarginTrading.Backend.Core.Settings
{
[UsedImplicitly]
public class BookKeeperServiceClient
{
// isAlive check leads to a deadlock
public string ServiceUrl { get; set; }

[Optional]
public string ApiKey { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using JetBrains.Annotations;
using Lykke.Common.Chaos;
using Lykke.SettingsReader.Attributes;
using MarginTrading.Backend.Core.Services;
using MarginTrading.Common.RabbitMq;
using MarginTrading.Common.Settings;

Expand Down Expand Up @@ -129,5 +130,7 @@ public class MarginTradingSettings

// todo: probably should be moved turned in to a feature flag
[Optional] public bool PerformanceTrackerEnabled { get; set; } = false;

[Optional] public SnapshotMonitorSettings SnapshotMonitorSettings { get; set; } = new SnapshotMonitorSettings();
}
}
20 changes: 20 additions & 0 deletions src/MarginTrading.Backend.Core/Settings/SnapshotMonitorSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;

namespace MarginTrading.Backend.Core.Settings
{
public class SnapshotMonitorSettings
{
/// <summary>
/// Defines the interval between consecutive checks performed by the SnapshotMonitoringService service.
/// </summary>
public TimeSpan MonitoringDelay { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// If snapshot is not created after a specified amount of time, creation will be retried
/// </summary>
public TimeSpan DelayBeforeFallbackSnapshot { get; set; } = TimeSpan.FromMinutes(5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using MarginTrading.Common.Services;
using MarginTrading.AssetService.Contracts;
using MarginTrading.AssetService.Contracts.Scheduling;
using MarginTrading.Backend.Core.Services;
using MarginTrading.Backend.Services.Extensions;
using Microsoft.FeatureManagement;
using MoreLinq;

Expand Down Expand Up @@ -49,6 +51,7 @@ public class ScheduleSettingsCacheService : IScheduleSettingsCacheService

private readonly ReaderWriterLockSlim _readerWriterLockSlim = new ReaderWriterLockSlim();
private readonly IFeatureManager _featureManager;
private readonly ISnapshotStatusTracker _snapshotStatusTracker;

public ScheduleSettingsCacheService(
ICqrsSender cqrsSender,
Expand All @@ -57,7 +60,8 @@ public ScheduleSettingsCacheService(
IDateService dateService,
ILog log,
OvernightMarginSettings overnightMarginSettings,
IFeatureManager featureManager)
IFeatureManager featureManager,
ISnapshotStatusTracker snapshotStatusTracker)
{
_cqrsSender = cqrsSender;
_scheduleSettingsApi = scheduleSettingsApi;
Expand All @@ -66,6 +70,7 @@ public ScheduleSettingsCacheService(
_log = log;
_overnightMarginSettings = overnightMarginSettings;
_featureManager = featureManager;
_snapshotStatusTracker = snapshotStatusTracker;
}

public async Task UpdateAllSettingsAsync()
Expand Down Expand Up @@ -185,12 +190,20 @@ private void HandleMarketStateChangesUnsafe(DateTime currentTime, string[] marke
.Where(x => marketIds.IsNullOrEmpty() || marketIds.Contains(x.Key)))
{
var newState = scheduleSettings.GetMarketState(marketId, currentTime);
_cqrsSender.PublishEvent(new MarketStateChangedEvent

var now = _dateService.Now();
var ev = new MarketStateChangedEvent
{
Id = marketId,
IsEnabled = newState.IsEnabled,
EventTimestamp = _dateService.Now(),
});
EventTimestamp = now,
};

if (ev.IsPlatformClosureEvent())
{
_snapshotStatusTracker.SnapshotRequested(now.Date);
}
_cqrsSender.PublishEvent(ev);

_marketStates[marketId] = newState;
}
Expand Down
25 changes: 25 additions & 0 deletions src/MarginTrading.Backend.Services/DraftSnapshotKeeperFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;
using MarginTrading.Backend.Core.Repositories;

namespace MarginTrading.Backend.Services
{
public class DraftSnapshotKeeperFactory : IDraftSnapshotKeeperFactory
{
private readonly ITradingEngineSnapshotsRepository _tradingEngineSnapshotsRepository;

public DraftSnapshotKeeperFactory(ITradingEngineSnapshotsRepository tradingEngineSnapshotsRepository)
{
_tradingEngineSnapshotsRepository = tradingEngineSnapshotsRepository;
}

public IDraftSnapshotKeeper Create(DateTime tradingDay)
{
var draftSnapshotKeeper = new DraftSnapshotKeeper(_tradingEngineSnapshotsRepository);
draftSnapshotKeeper.Init(tradingDay);
return draftSnapshotKeeper;
}
}
}
12 changes: 12 additions & 0 deletions src/MarginTrading.Backend.Services/IDraftSnapshotKeeperFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;

namespace MarginTrading.Backend.Services
{
public interface IDraftSnapshotKeeperFactory
{
IDraftSnapshotKeeper Create(DateTime tradingDay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public interface IFinalSnapshotCalculator
Task<TradingEngineSnapshot> RunAsync(
IEnumerable<ClosingFxRate> fxRates,
IEnumerable<ClosingAssetPrice> cfdQuotes,
string correlationId);
string correlationId,
IDraftSnapshotKeeper draftSnapshotKeeper = null);
}
}
39 changes: 39 additions & 0 deletions src/MarginTrading.Backend.Services/ISnapshotService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2019 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MarginTrading.Backend.Contracts.Prices;
using MarginTrading.Backend.Core.Snapshots;

namespace MarginTrading.Backend.Services
{
public interface ISnapshotService
{
/// <summary>
/// Make final trading snapshot from current system state
/// </summary>
/// <param name="tradingDay"></param>
/// <param name="correlationId"></param>
/// <param name="status"></param>
/// <returns></returns>
Task<string> MakeTradingDataSnapshot(
DateTime tradingDay,
string correlationId,
SnapshotStatus status = SnapshotStatus.Final);

/// <summary>
/// Make final trading snapshot from draft
/// </summary>
/// <param name="correlationId"></param>
/// <param name="cfdQuotes"></param>
/// <param name="fxRates"></param>
/// <returns></returns>
Task MakeTradingDataSnapshotFromDraft(
string correlationId,
IEnumerable<ClosingAssetPrice> cfdQuotes,
IEnumerable<ClosingFxRate> fxRates,
IDraftSnapshotKeeper draftSnapshotKeeper = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class SnapshotService : ISnapshotService
private readonly IMarginTradingBlobRepository _blobRepository;
private readonly ILog _log;
private readonly IFinalSnapshotCalculator _finalSnapshotCalculator;
private readonly ISnapshotStatusTracker _snapshotStatusTracker;
private readonly MarginTradingSettings _settings;

private static readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1);
Expand All @@ -54,6 +55,7 @@ public SnapshotService(
IMarginTradingBlobRepository blobRepository,
ILog log,
IFinalSnapshotCalculator finalSnapshotCalculator,
ISnapshotStatusTracker snapshotStatusTracker,
MarginTradingSettings settings)
{
_scheduleSettingsCacheService = scheduleSettingsCacheService;
Expand All @@ -68,6 +70,7 @@ public SnapshotService(
_blobRepository = blobRepository;
_log = log;
_finalSnapshotCalculator = finalSnapshotCalculator;
_snapshotStatusTracker = snapshotStatusTracker;
_settings = settings;
}

Expand Down Expand Up @@ -123,6 +126,8 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho

try
{
_snapshotStatusTracker.SnapshotInProgress();

var orders = _orderReader.GetAllOrders();
var ordersJson = orders.Select(o => o.ConvertToSnapshotContract(_orderReader, status)).ToJson();
await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
Expand Down Expand Up @@ -162,6 +167,11 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho
var accountsJson = accountStats
.Select(a => a.ConvertToSnapshotContract(accountsInLiquidation.Contains(a), status))
.ToJson();

// timestamp will be used as an eod border
// setting it as close as possible to accountStats retrieval
var timestamp = _dateService.Now();

await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
$"Preparing data... {accountStats.Count} accounts prepared.");

Expand All @@ -183,7 +193,7 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho
var snapshot = new TradingEngineSnapshot(
tradingDay,
correlationId,
_dateService.Now(),
timestamp,
ordersJson: ordersJson,
positionsJson: positionsJson,
accountsJson: accountsJson,
Expand All @@ -192,6 +202,8 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho
status: status);

await _tradingEngineSnapshotsRepository.AddAsync(snapshot);

_snapshotStatusTracker.SnapshotCreated();

await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapshot),
$"Trading data snapshot was written to the storage. {msg}");
Expand All @@ -207,7 +219,8 @@ await _log.WriteInfoAsync(nameof(SnapshotService), nameof(MakeTradingDataSnapsho
public async Task MakeTradingDataSnapshotFromDraft(
string correlationId,
IEnumerable<ClosingAssetPrice> cfdQuotes,
IEnumerable<ClosingFxRate> fxRates)
IEnumerable<ClosingFxRate> fxRates,
IDraftSnapshotKeeper draftSnapshotKeeper = null)
{
if (IsMakingSnapshotInProgress)
{
Expand All @@ -217,8 +230,7 @@ public async Task MakeTradingDataSnapshotFromDraft(
await Lock.WaitAsync();
try
{
var snapshot = await _finalSnapshotCalculator.RunAsync(fxRates, cfdQuotes, correlationId);

var snapshot = await _finalSnapshotCalculator.RunAsync(fxRates, cfdQuotes, correlationId, draftSnapshotKeeper);
await _tradingEngineSnapshotsRepository.AddAsync(snapshot);
}
finally
Expand Down
Loading

0 comments on commit 73d3cee

Please sign in to comment.