diff --git a/src/Nethermind/Nethermind.Api/IApiWithStores.cs b/src/Nethermind/Nethermind.Api/IApiWithStores.cs index 09142b2c0ed..459b92e3339 100644 --- a/src/Nethermind/Nethermind.Api/IApiWithStores.cs +++ b/src/Nethermind/Nethermind.Api/IApiWithStores.cs @@ -4,7 +4,6 @@ using Autofac; using Nethermind.Blockchain; using Nethermind.Blockchain.Blocks; -using Nethermind.Blockchain.Find; using Nethermind.Blockchain.Receipts; using Nethermind.Consensus; using Nethermind.Core; diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTree.cs b/src/Nethermind/Nethermind.Blockchain/BlockTree.cs index 9cf43a62c25..f649be65a65 100644 --- a/src/Nethermind/Nethermind.Blockchain/BlockTree.cs +++ b/src/Nethermind/Nethermind.Blockchain/BlockTree.cs @@ -446,6 +446,8 @@ public AddBlockResult SuggestBlock(Block block, BlockTreeSuggestOptions options public Hash256? FindBlockHash(long blockNumber) => GetBlockHashOnMainOrBestDifficultyHash(blockNumber); + public bool HasBlock(long blockNumber, Hash256 blockHash) => _blockStore.HasBlock(blockNumber, blockHash); + public BlockHeader? FindHeader(Hash256? blockHash, BlockTreeLookupOptions options, long? blockNumber = null) { if (blockHash is null || blockHash == Keccak.Zero) diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs b/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs index c16834f63a8..214a0ada99e 100644 --- a/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs +++ b/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs @@ -231,6 +231,9 @@ public void UpdateBeaconMainChain(BlockInfo[]? blockInfos, long clearBeaconMainC public Block? FindBlock(long blockNumber, BlockTreeLookupOptions options) => _overlayTree.FindBlock(blockNumber, options) ?? _baseTree.FindBlock(blockNumber, options); + public bool HasBlock(long blockNumber, Hash256 blockHash) => + _overlayTree.HasBlock(blockNumber, blockHash) || _baseTree.HasBlock(blockNumber, blockHash); + public BlockHeader? FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => _overlayTree.FindHeader(blockHash, options, blockNumber) ?? _baseTree.FindHeader(blockHash, options, blockNumber); diff --git a/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs b/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs index 98bb1030fff..d4de86e653c 100644 --- a/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs +++ b/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs @@ -30,6 +30,13 @@ public void SetMetadata(byte[] key, byte[] value) return blockDb.Get(key); } + public bool HasBlock(long blockNumber, Hash256 blockHash) + { + Span dbKey = stackalloc byte[40]; + KeyValueStoreExtensions.GetBlockNumPrefixedKey(blockNumber, blockHash, dbKey); + return blockDb.KeyExists(dbKey); + } + public void Insert(Block block, WriteFlags writeFlags = WriteFlags.None) { if (block.Hash is null) diff --git a/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs b/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs index 05bfec926de..d34dde4ace8 100644 --- a/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs +++ b/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs @@ -24,4 +24,5 @@ public interface IBlockStore // These two are used by blocktree. Try not to use them... void SetMetadata(byte[] key, byte[] value); byte[]? GetMetadata(byte[] key); + bool HasBlock(long blockNumber, Hash256 blockHash); } diff --git a/src/Nethermind/Nethermind.Blockchain/Find/IBlockFinder.cs b/src/Nethermind/Nethermind.Blockchain/Find/IBlockFinder.cs index e65b57f8c49..16788b8c901 100644 --- a/src/Nethermind/Nethermind.Blockchain/Find/IBlockFinder.cs +++ b/src/Nethermind/Nethermind.Blockchain/Find/IBlockFinder.cs @@ -25,6 +25,8 @@ public interface IBlockFinder Block? FindBlock(long blockNumber, BlockTreeLookupOptions options); + bool HasBlock(long blockNumber, Hash256 blockHash); + /// Find a header. blockNumber is optional, but specifying it can improve performance. BlockHeader? FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null); diff --git a/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs b/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs index 506973f7194..45aa4969678 100644 --- a/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs +++ b/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs @@ -94,6 +94,8 @@ public void UpdateHeadBlock(Hash256 blockHash) public Block FindBlock(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => _wrapped.FindBlock(blockHash, options, blockNumber); + public bool HasBlock(long blockNumber, Hash256 blockHash) => _wrapped.HasBlock(blockNumber, blockHash); + public BlockHeader FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => _wrapped.FindHeader(blockHash, options, blockNumber: blockNumber); public BlockHeader FindHeader(long blockNumber, BlockTreeLookupOptions options) => _wrapped.FindHeader(blockNumber, options); diff --git a/src/Nethermind/Nethermind.Db.Rocks/Config/DbConfig.cs b/src/Nethermind/Nethermind.Db.Rocks/Config/DbConfig.cs index 62944818adf..73d6ecd1547 100644 --- a/src/Nethermind/Nethermind.Db.Rocks/Config/DbConfig.cs +++ b/src/Nethermind/Nethermind.Db.Rocks/Config/DbConfig.cs @@ -67,6 +67,7 @@ public class DbConfig : IDbConfig public ulong? ReceiptsDbCompactionReadAhead { get; set; } public ulong ReceiptsDbTargetFileSizeBase { get; set; } = (ulong)64.MiB(); public double ReceiptsDbCompressibilityHint { get; set; } = 0.35; + public bool ReceiptsDbOptimizeFiltersForHits { get; set; } = false; public string? ReceiptsDbAdditionalRocksDbOptions { get; set; } = "compaction_pri=kOldestLargestSeqFirst"; public ulong BlocksDbWriteBufferSize { get; set; } = (ulong)64.MiB(); @@ -79,6 +80,7 @@ public class DbConfig : IDbConfig public bool? BlocksDbUseDirectReads { get; set; } public bool? BlocksDbUseDirectIoForFlushAndCompactions { get; set; } public ulong? BlocksDbCompactionReadAhead { get; set; } + public bool BlocksDbOptimizeFiltersForHits { get; set; } = false; public string? BlocksDbAdditionalRocksDbOptions { get; set; } = "compaction_pri=kOldestLargestSeqFirst"; public ulong HeadersDbWriteBufferSize { get; set; } = (ulong)8.MiB(); diff --git a/src/Nethermind/Nethermind.Db.Rocks/Config/IDbConfig.cs b/src/Nethermind/Nethermind.Db.Rocks/Config/IDbConfig.cs index a1d6af06eb6..0af4a0e66e0 100644 --- a/src/Nethermind/Nethermind.Db.Rocks/Config/IDbConfig.cs +++ b/src/Nethermind/Nethermind.Db.Rocks/Config/IDbConfig.cs @@ -68,6 +68,7 @@ public interface IDbConfig : IConfig ulong? ReceiptsDbCompactionReadAhead { get; set; } ulong ReceiptsDbTargetFileSizeBase { get; set; } double ReceiptsDbCompressibilityHint { get; set; } + bool ReceiptsDbOptimizeFiltersForHits { get; set; } string? ReceiptsDbAdditionalRocksDbOptions { get; set; } ulong BlocksDbWriteBufferSize { get; set; } @@ -80,6 +81,7 @@ public interface IDbConfig : IConfig bool? BlocksDbUseDirectReads { get; set; } bool? BlocksDbUseDirectIoForFlushAndCompactions { get; set; } ulong? BlocksDbCompactionReadAhead { get; set; } + bool BlocksDbOptimizeFiltersForHits { get; set; } string? BlocksDbAdditionalRocksDbOptions { get; set; } ulong HeadersDbWriteBufferSize { get; set; } diff --git a/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs b/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs index c3825373730..05a4fe994e3 100644 --- a/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs +++ b/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs @@ -81,4 +81,9 @@ public void SetMetadata(byte[] key, byte[] value) { return _metadataDict.TryGetValue(key, out var value) ? value : readonlyBaseBlockStore.GetMetadata(key); } + + public bool HasBlock(long blockNumber, Hash256 blockHash) + { + return _blockNumDict.ContainsKey(blockNumber); + } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs index 8e4e7a52d99..98bf09d31f8 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using FluentAssertions; using Nethermind.Blockchain; +using Nethermind.Blockchain.Blocks; using Nethermind.Blockchain.Synchronization; using Nethermind.Core; using Nethermind.Core.Extensions; @@ -116,6 +117,29 @@ async Task HandleAndPrepareNextRequest() req.Dispose(); } + [Test] + public async Task ShouldNotReDownloadExistingBlock() + { + _feed.InitializeFeed(); + + _syncingToBlockTree.Insert(_syncingFromBlockTree.FindBlock(_pivotBlock.Number - 2)!); + _syncingToBlockTree.Insert(_syncingFromBlockTree.FindBlock(_pivotBlock.Number - 4)!); + + using BodiesSyncBatch req = (await _feed.PrepareRequest())!; + req.Infos + .Where((bi) => bi is not null) + .Select((bi) => bi!.BlockNumber) + .Take(4) + .Should() + .BeEquivalentTo([ + _pivotBlock.Number, + _pivotBlock.Number - 1, + // Skipped + _pivotBlock.Number - 3, + // Skipped + _pivotBlock.Number - 5]); + } + [Test] public async Task ShouldRecoverOnInsertFailure() { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/SyncStatusListTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/SyncStatusListTests.cs index 98a375ba26a..bbd4ea0774c 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/SyncStatusListTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/SyncStatusListTests.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using FluentAssertions; @@ -10,6 +11,7 @@ using Nethermind.Core.Test.Builders; using Nethermind.Synchronization.FastBlocks; using NSubstitute; +using NSubstitute.Core; using NUnit.Framework; namespace Nethermind.Synchronization.Test.FastBlocks; @@ -50,12 +52,44 @@ public void Will_not_go_below_ancient_barrier() blockTree.FindCanonicalBlockInfo(Arg.Any()).Returns(new BlockInfo(TestItem.KeccakA, 0)); SyncStatusList syncStatusList = new SyncStatusList(blockTree, 1000, null, 900); - BlockInfo?[] infos = new BlockInfo?[500]; - syncStatusList.GetInfosForBatch(infos); + BlockInfo?[] infos; + syncStatusList.TryGetInfosForBatch(500, (_) => false, out infos); infos.Count((it) => it is not null).Should().Be(101); } + [Test] + public void Will_skip_existing_keys() + { + IBlockTree blockTree = Substitute.For(); + blockTree.FindCanonicalBlockInfo(Arg.Any()) + .Returns((Func)((ci) => + { + long blockNumber = (long)ci[0]; + return new BlockInfo(TestItem.KeccakA, 0) + { + BlockNumber = blockNumber + }; + })); + + SyncStatusList syncStatusList = new SyncStatusList(blockTree, 100000, null, 1000); + + HashSet needToFetchBlocks = [99999, 99995, 99950, 99000, 99001, 99003, 85000]; + + List TryGetInfos() + { + BlockInfo?[] infos; + syncStatusList.TryGetInfosForBatch(50, (bi) => !needToFetchBlocks.Contains(bi.BlockNumber), out infos); + return infos.Where(bi => bi != null).Select((bi) => bi!.BlockNumber).ToList(); + } + + TryGetInfos().Should().BeEquivalentTo([99999, 99995]); // first two as it will try the first 50 only + TryGetInfos().Should().BeEquivalentTo([99950]); // Then the next 50 + TryGetInfos().Should().BeEquivalentTo([99000, 99001, 99003]); // If the next 50 failed, it will try looking far back. + TryGetInfos().Should().BeEmpty(); // If it look far back enough and still does not find anything it will just return so that progress can update. + TryGetInfos().Should().BeEquivalentTo([85000]); // But as the existing blocks was already marked as inserted, it should be able to make progress on later call. + } + [Test] public void Can_read_back_all_parallel_set_values() { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/ReceiptSyncFeedTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/ReceiptSyncFeedTests.cs index d92204e234e..eef137e816f 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/ReceiptSyncFeedTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/ReceiptSyncFeedTests.cs @@ -9,6 +9,7 @@ using Nethermind.Blockchain.Receipts; using Nethermind.Blockchain.Synchronization; using Nethermind.Core; +using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; using Nethermind.Core.Test.Builders; using Nethermind.Db; @@ -25,54 +26,72 @@ namespace Nethermind.Synchronization.Test; public class ReceiptSyncFeedTests { + private IBlockTree _syncingFromBlockTree = null!; + private IBlockTree _syncingToBlockTree = null!; + private ReceiptsSyncFeed _feed = null!; + private ISyncConfig _syncConfig = null!; + private Block _pivotBlock = null!; + private InMemoryReceiptStorage _syncingFromReceiptStore; + private IReceiptStorage _receiptStorage; - [Test] - public async Task ShouldRecoverOnInsertFailure() + [SetUp] + public void Setup() { - InMemoryReceiptStorage syncingFromReceiptStore = new InMemoryReceiptStorage(); - BlockTree syncingFromBlockTree = Build.A.BlockTree() - .WithTransactions(syncingFromReceiptStore) + _syncingFromReceiptStore = new InMemoryReceiptStorage(); + _syncingFromBlockTree = Build.A.BlockTree() + .WithTransactions(_syncingFromReceiptStore) .OfChainLength(100) .TestObject; - BlockTree syncingTooBlockTree = Build.A.BlockTree() + _receiptStorage = Substitute.For(); + _syncingToBlockTree = Build.A.BlockTree() .TestObject; for (int i = 1; i < 100; i++) { - Block block = syncingFromBlockTree.FindBlock(i, BlockTreeLookupOptions.None)!; - syncingTooBlockTree.Insert(block.Header); - syncingTooBlockTree.Insert(block); + Block block = _syncingFromBlockTree.FindBlock(i, BlockTreeLookupOptions.None)!; + _syncingToBlockTree.Insert(block.Header); + _syncingToBlockTree.Insert(block); } - Block pivot = syncingFromBlockTree.FindBlock(99, BlockTreeLookupOptions.None)!; + _pivotBlock = _syncingFromBlockTree.FindBlock(99, BlockTreeLookupOptions.None)!; - SyncConfig syncConfig = new() + _syncConfig = new SyncConfig() { FastSync = true, - PivotHash = pivot.Hash!.ToString(), - PivotNumber = pivot.Number.ToString(), + PivotHash = _pivotBlock.Hash!.ToString(), + PivotNumber = _pivotBlock.Number.ToString(), AncientBodiesBarrier = 0, DownloadBodiesInFastSync = true, }; - IReceiptStorage receiptStorage = Substitute.For(); - ReceiptsSyncFeed syncFeed = new ReceiptsSyncFeed( + _feed = new ReceiptsSyncFeed( MainnetSpecProvider.Instance, - syncingTooBlockTree, - receiptStorage, + _syncingToBlockTree, + _receiptStorage, Substitute.For(), - syncConfig, + _syncConfig, new NullSyncReport(), new MemDb(), LimboLogs.Instance ); - syncFeed.InitializeFeed(); + } + + [TearDown] + public void TearDown() + { + _feed.Dispose(); + } + + [Test] + public async Task ShouldRecoverOnInsertFailure() + { + _feed.InitializeFeed(); - using ReceiptsSyncBatch req = (await syncFeed.PrepareRequest())!; - req.Response = req.Infos.Take(8).Select(info => syncingFromReceiptStore.Get(info!.BlockHash)).ToPooledList(8)!; + using ReceiptsSyncBatch req = (await _feed.PrepareRequest())!; + req.Response = req.Infos.Take(8).Select(info => _syncingFromReceiptStore.Get(info!.BlockHash)).ToPooledList(8)!; - receiptStorage + _receiptStorage .When((it) => it.Insert(Arg.Any(), Arg.Any(), Arg.Any())) .Do((callInfo) => { @@ -80,9 +99,32 @@ public async Task ShouldRecoverOnInsertFailure() if (block.Number == 95) throw new Exception("test exception"); }); - Func act = () => syncFeed.HandleResponse(req); + Func act = () => _feed.HandleResponse(req); act.Should().Throw(); - using ReceiptsSyncBatch req2 = (await syncFeed.PrepareRequest())!; + using ReceiptsSyncBatch req2 = (await _feed.PrepareRequest())!; req2.Infos[0]!.BlockNumber.Should().Be(95); } + + [Test] + public async Task ShouldNotRedownloadExistingReceipts() + { + _feed.InitializeFeed(); + _receiptStorage.HasBlock(Arg.Is(_pivotBlock.Number - 2), Arg.Any()).Returns(true); + _receiptStorage.HasBlock(Arg.Is(_pivotBlock.Number - 4), Arg.Any()).Returns(true); + + using ReceiptsSyncBatch req = (await _feed.PrepareRequest())!; + + req.Infos + .Where((bi) => bi is not null) + .Select((bi) => bi!.BlockNumber) + .Take(4) + .Should() + .BeEquivalentTo([ + _pivotBlock.Number, + _pivotBlock.Number - 1, + // Skipped + _pivotBlock.Number - 3, + // Skipped + _pivotBlock.Number - 5]); + } } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs index 1152f416054..edf132f066c 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs @@ -7,6 +7,7 @@ using Autofac.Features.AttributeFilters; using Microsoft.Extensions.DependencyInjection; using Nethermind.Blockchain; +using Nethermind.Blockchain.Blocks; using Nethermind.Blockchain.Synchronization; using Nethermind.Consensus.Validators; using Nethermind.Core; @@ -130,11 +131,20 @@ private void PostFinishCleanUp() BodiesSyncBatch? batch = null; if (ShouldBuildANewBatch()) { - BlockInfo?[] infos = new BlockInfo[_requestSize]; - _syncStatusList.GetInfosForBatch(infos); + BlockInfo?[] infos = null; + while (!_syncStatusList.TryGetInfosForBatch(_requestSize, (info) => _blockTree.HasBlock(info.BlockNumber, info.BlockHash), out infos)) + { + token.ThrowIfCancellationRequested(); + + // Otherwise, the progress does not update correctly + _blockTree.LowestInsertedBodyNumber = _syncStatusList.LowestInsertWithoutGaps; + UpdateSyncReport(); + } + if (infos[0] is not null) { batch = new BodiesSyncBatch(infos); + // Used for peer allocation. It pick peer which have the at least this number batch.MinNumber = infos[0].BlockNumber; batch.Prioritized = true; } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs index cbfce46215c..0869d2bcbbc 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/ReceiptsSyncFeed.cs @@ -133,8 +133,14 @@ private void PostFinishCleanUp() ReceiptsSyncBatch? batch = null; if (ShouldBuildANewBatch()) { - BlockInfo?[] infos = new BlockInfo[_requestSize]; - _syncStatusList.GetInfosForBatch(infos); + BlockInfo?[] infos = null; + while (!_syncStatusList.TryGetInfosForBatch(_requestSize, (info) => _receiptStorage.HasBlock(info.BlockNumber, info.BlockHash), out infos)) + { + token.ThrowIfCancellationRequested(); + _receiptStorage.LowestInsertedReceiptBlockNumber = _syncStatusList.LowestInsertWithoutGaps; + UpdateSyncReport(); + } + if (infos[0] is not null) { batch = new ReceiptsSyncBatch(infos); @@ -280,11 +286,9 @@ private int InsertReceipts(ReceiptsSyncBatch batch) } } + UpdateSyncReport(); AdjustRequestSize(batch, validResponsesCount); LogPostProcessingBatchInfo(batch, validResponsesCount); - - _syncReport.FastBlocksReceipts.Update(_pivotNumber - _syncStatusList.LowestInsertWithoutGaps); - _syncReport.ReceiptsInQueue.Update(_syncStatusList.QueueSize); return validResponsesCount; } @@ -295,6 +299,12 @@ private void LogPostProcessingBatchInfo(ReceiptsSyncBatch batch, int validRespon $"{nameof(ReceiptsSyncBatch)} back from {batch.ResponseSourcePeer} with {validResponsesCount}/{batch.Infos.Length}"); } + private void UpdateSyncReport() + { + _syncReport.FastBlocksReceipts.Update(_pivotNumber - _syncStatusList.LowestInsertWithoutGaps); + _syncReport.ReceiptsInQueue.Update(_syncStatusList.QueueSize); + } + private void AdjustRequestSize(ReceiptsSyncBatch batch, int validResponsesCount) { int currentRequestSize = Volatile.Read(ref _requestSize); diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs index 373afedd1d2..208d355a844 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/SyncStatusList.cs @@ -3,14 +3,17 @@ using System; using System.Threading; +using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Core; using Nethermind.Core.Caching; +using Nethermind.Core.Collections; namespace Nethermind.Synchronization.FastBlocks { internal class SyncStatusList { + private const int ParallelExistCheckSize = 1024; private long _queueSize; private readonly IBlockTree _blockTree; private readonly FastBlockStatusList _statuses; @@ -35,7 +38,7 @@ public SyncStatusList(IBlockTree blockTree, long pivotNumber, long? lowestInsert _lowerBound = lowerBound; } - public void GetInfosForBatch(BlockInfo?[] blockInfos) + private void GetInfosForBatch(Span blockInfos) { int collected = 0; long currentNumber = Volatile.Read(ref _lowestInsertWithoutGaps); @@ -77,6 +80,90 @@ public void GetInfosForBatch(BlockInfo?[] blockInfos) } } + /// + /// Try get block infos of size `batchSize`. + /// + /// + /// + /// + /// + public bool TryGetInfosForBatch(int batchSize, Func blockExist, out BlockInfo?[] infos) + { + ArrayPoolList workingArray = new(batchSize, batchSize); + + // Need to be a max attempt to update sync progress + const int maxAttempt = 8; + for (int attempt = 0; attempt < maxAttempt; attempt++) + { + // Because the last clause of GetInfosForBatch increment the _lowestInsertWithoutGap need to be run + // sequentially, can't find an easy way to parallelize the checking for block exist part in the check + // So here we are... + GetInfosForBatch(workingArray.AsSpan()); + + (bool hasNonNull, bool hasInserted) = ClearExistingBlock(); + + if (hasNonNull || !hasInserted) + { + CompileOutput(out infos); + return true; + } + + // At this point, hasNonNull is false and hasInserted is true, meaning all entry in workingArray + // already exist. We switch to a bigger array to improve parallelization throughput + if (workingArray.Count < ParallelExistCheckSize) + { + workingArray = new ArrayPoolList(ParallelExistCheckSize, ParallelExistCheckSize); + } + } + + infos = []; + return false; + + (bool, bool) ClearExistingBlock() + { + bool hasNonNull = false; + bool hasInserted = false; + Parallel.For(0, workingArray.Count, (i) => + { + if (workingArray[i] is not null) + { + if (blockExist(workingArray[i])) + { + MarkInserted(workingArray[i].BlockNumber); + hasInserted = true; + workingArray[i] = null; + } + else + { + hasNonNull = true; + } + } + }); + return (hasNonNull, hasInserted); + } + + void CompileOutput(out BlockInfo?[] outputArray) + { + int slot = 0; + outputArray = new BlockInfo?[batchSize]; + for (int i = 0; i < workingArray.Count; i++) + { + if (workingArray[i] is null) continue; + + if (slot < outputArray.Length) + { + outputArray[slot] = workingArray[i]; + slot++; + } + else + { + // Not enough space in output we'll need to put back the block + MarkPending(workingArray[i]); + } + } + } + } + public void MarkInserted(long blockNumber) { if (_statuses.TrySet(blockNumber, FastBlockStatus.Inserted))