From de50d0be4979e8979041a3f2c057525f9e90ee44 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 27 Jan 2025 18:30:40 +0300 Subject: [PATCH 1/3] bqueue: make queue generic The queue can operate with any item that implements proper interface. Signed-off-by: Ekaterina Pavlova --- internal/fakechain/fakechain.go | 5 ++ pkg/consensus/consensus.go | 4 +- pkg/consensus/consensus_test.go | 2 +- pkg/core/block/block.go | 7 ++ pkg/core/block/header.go | 7 ++ pkg/core/statesync/module.go | 5 ++ pkg/network/bqueue/queue.go | 115 ++++++++++++++++--------------- pkg/network/bqueue/queue_test.go | 43 +++++++++--- pkg/network/bqueue_adapters.go | 53 ++++++++++++++ pkg/network/server.go | 36 +++++----- pkg/network/state_sync.go | 3 +- 11 files changed, 194 insertions(+), 86 deletions(-) create mode 100644 pkg/network/bqueue_adapters.go diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index a2553b61a1..6639945d0d 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -49,6 +49,11 @@ type FakeStateSync struct { AddMPTNodesFunc func(nodes [][]byte) error } +// HeaderHeight returns the height of the latest stored header. +func (s *FakeStateSync) HeaderHeight() uint32 { + return 0 +} + // NewFakeChain returns a new FakeChain structure. func NewFakeChain() *FakeChain { return NewFakeChainWithCustomCfg(nil) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 5f251c8ed2..1e5c03e899 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -58,7 +58,7 @@ type Ledger interface { // BlockQueuer is an interface to the block queue manager sufficient for Service. type BlockQueuer interface { - PutBlock(block *coreb.Block) error + Put(queueable *coreb.Block) error } // Service represents a consensus instance. @@ -623,7 +623,7 @@ func (s *service) processBlock(b dbft.Block[util.Uint256]) error { bb := &b.(*neoBlock).Block bb.Script = *(s.getBlockWitness(bb)) - if err := s.BlockQueue.PutBlock(bb); err != nil { + if err := s.BlockQueue.Put(bb); err != nil { // The block might already be added via the regular network // interaction. if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index c7066aaca1..1156725d86 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -528,7 +528,7 @@ type testBlockQueuer struct { var _ = BlockQueuer(testBlockQueuer{}) // PutBlock implements BlockQueuer interface. -func (bq testBlockQueuer) PutBlock(b *coreb.Block) error { +func (bq testBlockQueuer) Put(b *coreb.Block) error { return bq.bc.AddBlock(b) } diff --git a/pkg/core/block/block.go b/pkg/core/block/block.go index 9b246a4d3a..88f78dc5a5 100644 --- a/pkg/core/block/block.go +++ b/pkg/core/block/block.go @@ -51,6 +51,13 @@ type auxBlockIn struct { Transactions []json.RawMessage `json:"tx"` } +// GetIndex returns the index of the block. This method should be used +// for interfaces only. As generics don't support structural types +// ref. golang/go#51259. +func (b *Block) GetIndex() uint32 { + return b.Index +} + // ComputeMerkleRoot computes Merkle tree root hash based on actual block's data. func (b *Block) ComputeMerkleRoot() util.Uint256 { hashes := make([]util.Uint256, len(b.Transactions)) diff --git a/pkg/core/block/header.go b/pkg/core/block/header.go index 2edd763dc5..b056b0c671 100644 --- a/pkg/core/block/header.go +++ b/pkg/core/block/header.go @@ -80,6 +80,13 @@ type baseAux struct { Witnesses []transaction.Witness `json:"witnesses"` } +// GetIndex returns the index of the block. This method should be used +// for interfaces only. As generics don't support structural types +// ref. golang/go#51259. +func (b *Header) GetIndex() uint32 { + return b.Index +} + // Hash returns the hash of the block. Notice that it is cached internally, // so no matter how you change the [Header] after the first invocation of this // method it won't change. To get an updated hash in case you're changing diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 4915a279fc..0d26a861f4 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -508,3 +508,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { return s.mptpool.GetBatch(limit) } + +// HeaderHeight returns the height of the latest stored header. +func (s *Module) HeaderHeight() uint32 { + return s.bc.HeaderHeight() +} diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 2899b1ac51..38d22f817a 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -5,108 +5,115 @@ import ( "sync/atomic" "time" - "github.com/nspcc-dev/neo-go/pkg/core/block" "go.uber.org/zap" ) -// Blockqueuer is an interface for a block queue. -type Blockqueuer interface { - AddBlock(block *block.Block) error - AddHeaders(...*block.Header) error - BlockHeight() uint32 +// Queuer is an interface for a queue. +type Queuer[Q Queueable] interface { + AddItem(Q) error + AddItems(...Q) error + Height() uint32 } -// OperationMode is the mode of operation for the block queue. +// OperationMode is the mode of operation for the queue. // Could be either Blocking or NonBlocking. type OperationMode byte const ( - // NonBlocking means that PutBlock will return immediately if the queue is full. + // NonBlocking means that Put will return immediately if the queue is full. NonBlocking OperationMode = 0 - // Blocking means that PutBlock will wait until there is enough space in the queue. + // Blocking means that Put will wait until there is enough space in the queue. Blocking OperationMode = 1 ) -// Queue is the block queue. -type Queue struct { +// Queueable is an interface for a queue element. +type Queueable interface { + GetIndex() uint32 + comparable +} + +// Queue is the queue of queueable elements. +type Queue[Q Queueable] struct { log *zap.Logger queueLock sync.RWMutex - queue []*block.Block + queue []Q lastQ uint32 checkBlocks chan struct{} - chain Blockqueuer - relayF func(*block.Block) + chain Queuer[Q] + relayF func(Q) discarded atomic.Bool len int lenUpdateF func(int) cacheSize int mode OperationMode + nilQ Q } -// DefaultCacheSize is the default amount of blocks above the current height +// DefaultCacheSize is the default amount of Queueable elements above the current height // which are stored in the queue. const DefaultCacheSize = 2000 -func (bq *Queue) indexToPosition(i uint32) int { +func (bq *Queue[Q]) indexToPosition(i uint32) int { return int(i) % bq.cacheSize } -// New creates an instance of BlockQueue. -func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue { +// New creates an instance of Queue that handles Queueable elements. +func New[Q Queueable](bc Queuer[Q], log *zap.Logger, relayer func(Q), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue[Q] { if log == nil { return nil } if cacheSize <= 0 { cacheSize = DefaultCacheSize } - - return &Queue{ + var nilQ Q + return &Queue[Q]{ log: log, - queue: make([]*block.Block, cacheSize), + queue: make([]Q, cacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, lenUpdateF: lenMetricsUpdater, cacheSize: cacheSize, mode: mode, + nilQ: nilQ, } } -// Run runs the BlockQueue queueing loop. It must be called in a separate routine. -func (bq *Queue) Run() { - var lastHeight = bq.chain.BlockHeight() +// Run runs the Queue queueing loop. It must be called in a separate routine. +func (bq *Queue[Q]) Run() { + var lastHeight = bq.chain.Height() for { _, ok := <-bq.checkBlocks if !ok { break } for { - h := bq.chain.BlockHeight() + h := bq.chain.Height() pos := bq.indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] - // The chain moved forward using blocks from other sources (consensus). + // The chain moved forward using elements from other sources (consensus). for i := lastHeight; i < h; i++ { old := bq.indexToPosition(i + 1) - if bq.queue[old] != nil && bq.queue[old].Index == i { + if bq.queue[old] != bq.nilQ && bq.queue[old].GetIndex() == i { bq.len-- - bq.queue[old] = nil + bq.queue[old] = bq.nilQ } } bq.queueLock.Unlock() lastHeight = h - if b == nil { + if b == bq.nilQ { break } - err := bq.chain.AddBlock(b) + err := bq.chain.AddItem(b) if err != nil { - // The block might already be added by the consensus. - if bq.chain.BlockHeight() < b.Index { - bq.log.Warn("blockQueue: failed adding block into the blockchain", + // The element might already be added by the consensus. + if bq.chain.Height() < b.GetIndex() { + bq.log.Warn("Queue: failed adding item into the blockchain", zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", b.Index)) + zap.Uint32("height", bq.chain.Height()), + zap.Uint32("nextIndex", b.GetIndex())) } } else if bq.relayF != nil { bq.relayF(b) @@ -115,7 +122,7 @@ func (bq *Queue) Run() { bq.len-- l := bq.len if bq.queue[pos] == b { - bq.queue[pos] = nil + bq.queue[pos] = bq.nilQ } bq.queueLock.Unlock() if bq.lenUpdateF != nil { @@ -125,9 +132,9 @@ func (bq *Queue) Run() { } } -// PutBlock enqueues block to be added to the chain. -func (bq *Queue) PutBlock(block *block.Block) error { - h := bq.chain.BlockHeight() +// Put enqueues Queueable element to be added to the chain. +func (bq *Queue[Q]) Put(element Q) error { + h := bq.chain.Height() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { @@ -135,10 +142,10 @@ func (bq *Queue) PutBlock(block *block.Block) error { } // Can easily happen when fetching the same blocks from // different peers, thus not considered as error. - if block.Index <= h { + if element.GetIndex() <= h { return nil } - if h+uint32(bq.cacheSize) < block.Index { + if h+uint32(bq.cacheSize) < element.GetIndex() { switch bq.mode { case NonBlocking: return nil @@ -151,21 +158,21 @@ func (bq *Queue) PutBlock(block *block.Block) error { bq.queueLock.Lock() return nil } - h = bq.chain.BlockHeight() - if h+uint32(bq.cacheSize) >= block.Index { + h = bq.chain.Height() + if h+uint32(bq.cacheSize) >= element.GetIndex() { bq.queueLock.Lock() break } } } } - pos := bq.indexToPosition(block.Index) - // If we already have it, keep the old block, throw away the new one. - if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { + pos := bq.indexToPosition(element.GetIndex()) + // If we already have it, keep the old element, throw away the new one. + if bq.queue[pos] == bq.nilQ || bq.queue[pos].GetIndex() < element.GetIndex() { bq.len++ - bq.queue[pos] = block - for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { - bq.lastQ = bq.queue[pos].Index + bq.queue[pos] = element + for pos < bq.cacheSize && bq.queue[pos] != bq.nilQ && bq.lastQ+1 == bq.queue[pos].GetIndex() { + bq.lastQ = bq.queue[pos].GetIndex() pos++ } } @@ -177,21 +184,21 @@ func (bq *Queue) PutBlock(block *block.Block) error { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue default: - // it's already busy processing blocks + // it's already busy processing elements } return nil } -// LastQueued returns the index of the last queued block and the queue's capacity +// LastQueued returns the index of the last queued element and the queue's capacity // left. -func (bq *Queue) LastQueued() (uint32, int) { +func (bq *Queue[Q]) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() return bq.lastQ, bq.cacheSize - bq.len } -// Discard stops the queue and prevents it from accepting more blocks to enqueue. -func (bq *Queue) Discard() { +// Discard stops the queue and prevents it from accepting more elements to enqueue. +func (bq *Queue[Q]) Discard() { if bq.discarded.CompareAndSwap(false, true) { bq.queueLock.Lock() close(bq.checkBlocks) diff --git a/pkg/network/bqueue/queue_test.go b/pkg/network/bqueue/queue_test.go index e481fba56f..c368b8037d 100644 --- a/pkg/network/bqueue/queue_test.go +++ b/pkg/network/bqueue/queue_test.go @@ -13,14 +13,14 @@ import ( func TestBlockQueue(t *testing.T) { chain := fakechain.NewFakeChain() // notice, it's not yet running - bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking) + bq := New(fakechainBlockQueueAdapter{chain}, zaptest.NewLogger(t), nil, 0, nil, NonBlocking) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} } // not the ones expected currently for i := 3; i < 5; i++ { - assert.NoError(t, bq.PutBlock(blocks[i])) + assert.NoError(t, bq.Put(blocks[i])) } last, capLeft := bq.LastQueued() assert.Equal(t, uint32(0), last) @@ -30,7 +30,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, 2, bq.length()) // now added the expected ones (with duplicates) for i := 1; i < 5; i++ { - assert.NoError(t, bq.PutBlock(blocks[i])) + assert.NoError(t, bq.Put(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running last, capLeft = bq.LastQueued() @@ -39,7 +39,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped - assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}})) + assert.NoError(t, bq.Put(&block.Block{Header: block.Header{Index: bq.chain.Height() + DefaultCacheSize + 1}})) assert.Equal(t, 4, bq.length()) go bq.Run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one @@ -51,7 +51,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { - assert.NoError(t, bq.PutBlock(blocks[i])) + assert.NoError(t, bq.Put(blocks[i])) } last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) @@ -59,18 +59,18 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active - assert.NoError(t, bq.PutBlock(blocks[8])) + assert.NoError(t, bq.Put(blocks[8])) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.PutBlock(blocks[7])) + assert.NoError(t, bq.Put(blocks[7])) assert.Equal(t, 2, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // sparse put - assert.NoError(t, bq.PutBlock(blocks[10])) + assert.NoError(t, bq.Put(blocks[10])) assert.Equal(t, 3, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.PutBlock(blocks[6])) - assert.NoError(t, bq.PutBlock(blocks[5])) + assert.NoError(t, bq.Put(blocks[6])) + assert.NoError(t, bq.Put(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) last, capLeft = bq.LastQueued() @@ -83,8 +83,29 @@ func TestBlockQueue(t *testing.T) { } // length wraps len access for tests to make them thread-safe. -func (bq *Queue) length() int { +func (bq *Queue[Q]) length() int { bq.queueLock.Lock() defer bq.queueLock.Unlock() return bq.len } + +type fakechainBlockQueueAdapter struct { + chain *fakechain.FakeChain +} + +func (c fakechainBlockQueueAdapter) AddItem(b *block.Block) error { + return c.chain.AddBlock(b) +} + +func (c fakechainBlockQueueAdapter) AddItems(blk ...*block.Block) error { + for _, b := range blk { + if err := c.chain.AddBlock(b); err != nil { + return err + } + } + return nil +} + +func (c fakechainBlockQueueAdapter) Height() uint32 { + return c.chain.BlockHeight() +} diff --git a/pkg/network/bqueue_adapters.go b/pkg/network/bqueue_adapters.go new file mode 100644 index 0000000000..906978c093 --- /dev/null +++ b/pkg/network/bqueue_adapters.go @@ -0,0 +1,53 @@ +package network + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" +) + +var ( + _ = (bqueue.Queuer[*block.Block])(&stateSyncBlockQueueAdapter{}) + _ = (bqueue.Queuer[*block.Block])(&chainBlockQueueAdapter{}) +) + +// stateSyncBlockQueueAdapter is a wrapper over StateSync module that that +// implements the [bqueue.Queuer] interface for operating with [*block.Block]. +type stateSyncBlockQueueAdapter struct { + stateSync StateSync +} + +// AddItem implements the [bqueue.Queuer] interface. +func (s stateSyncBlockQueueAdapter) AddItem(b *block.Block) error { + return s.stateSync.AddBlock(b) +} + +// AddItems implements the [bqueue.Queuer] interface. +func (s stateSyncBlockQueueAdapter) AddItems(blks ...*block.Block) error { + panic("AddItems is not implemented for *block.Block") +} + +// Height implements the [bqueue.Queuer] interface. +func (s stateSyncBlockQueueAdapter) Height() uint32 { + return s.stateSync.BlockHeight() +} + +// chainBlockQueueAdapter is a wrapper over the [Ledger] interface that +// implements the [bqueue.Queuer] interface for operating with [*block.Block]. +type chainBlockQueueAdapter struct { + chain Ledger +} + +// AddItem implements the [bqueue.Queuer] interface. +func (c chainBlockQueueAdapter) AddItem(b *block.Block) error { + return c.chain.AddBlock(b) +} + +// AddItems implements the [bqueue.Queuer] interface. +func (c chainBlockQueueAdapter) AddItems(blk ...*block.Block) error { + panic("AddItems is not implemented for *block.Block") +} + +// Height implements the [bqueue.Queuer] interface. +func (c chainBlockQueueAdapter) Height() uint32 { + return c.chain.BlockHeight() +} diff --git a/pkg/network/server.go b/pkg/network/server.go index b8998700c0..be06537f5a 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -57,9 +57,9 @@ var ( type ( // Ledger is everything Server needs from the blockchain. Ledger interface { + blockHeaderQueuer extpool.Ledger mempool.Feer - bqueue.Blockqueuer GetBlock(hash util.Uint256) (*block.Block, error) GetConfig() config.Blockchain GetHeader(hash util.Uint256) (*block.Header, error) @@ -87,6 +87,13 @@ type ( Shutdown() } + // blockHeaderQueuer is a minimal subset of Ledger interface. + blockHeaderQueuer interface { + AddBlock(*block.Block) error + AddHeaders(...*block.Header) error + BlockHeight() uint32 + HeaderHeight() uint32 + } // Server represents the local Node in the network. Its transport could // be of any kind. Server struct { @@ -102,9 +109,9 @@ type ( transports []Transporter discovery Discoverer chain Ledger - bQueue *bqueue.Queue - bSyncQueue *bqueue.Queue - bFetcherQueue *bqueue.Queue + bQueue *bqueue.Queue[*block.Block] + bSyncQueue *bqueue.Queue[*block.Block] + bFetcherQueue *bqueue.Queue[*block.Block] mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool @@ -219,18 +226,15 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy }, s.notaryFeer) }) } - s.bQueue = bqueue.New(chain, log, func(b *block.Block) { - s.tryStartServices() - }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bQueue = bqueue.New[*block.Block](chainBlockQueueAdapter{chain}, log, func(b *block.Block) { s.tryStartServices() }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) - s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bSyncQueue = bqueue.New[*block.Block](stateSyncBlockQueueAdapter{s.stateSync}, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize } - s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.bFetcherQueue = bqueue.New[*block.Block](chainBlockQueueAdapter{chain}, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) var err error - s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, - sync.OnceFunc(func() { close(s.blockFetcherFin) })) + s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.Put, sync.OnceFunc(func() { close(s.blockFetcherFin) })) if err != nil { return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) } @@ -374,7 +378,7 @@ func (s *Server) addService(svc Service) { } // GetBlockQueue returns the block queue instance managed by Server. -func (s *Server) GetBlockQueue() *bqueue.Queue { +func (s *Server) GetBlockQueue() *bqueue.Queue[*block.Block] { return s.bQueue } @@ -796,9 +800,9 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { return nil } if s.stateSync.IsActive() { - return s.bSyncQueue.PutBlock(block) + return s.bSyncQueue.Put(block) } - return s.bQueue.PutBlock(block) + return s.bQueue.Put(block) } // handlePing processes a ping request. @@ -825,7 +829,7 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { return nil } var ( - bq bqueue.Blockqueuer = s.chain + bq blockHeaderQueuer = s.chain requestMPTNodes bool ) if s.stateSync.IsActive() { @@ -1334,7 +1338,7 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 1. Block range is divided into chunks of payload.MaxHashesCount. // 2. Send requests for chunk in increasing order. // 3. After all requests have been sent, request random height. -func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error { +func (s *Server) requestBlocks(bq blockHeaderQueuer, p Peer) error { pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) lq, capLeft := s.bQueue.LastQueued() if capLeft == 0 { diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index b7e5a3d3be..796da8b874 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -2,14 +2,13 @@ package network import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" - "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" ) // StateSync represents state sync module. type StateSync interface { + blockHeaderQueuer AddMPTNodes([][]byte) error - bqueue.Blockqueuer Init(currChainHeight uint32) error IsActive() bool IsInitialized() bool From 91b60ea7d33d43ff9a04bef7fdfa56b48cb89b9e Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 30 Jan 2025 14:27:00 +0300 Subject: [PATCH 2/3] network: integrate state sync module with blockfetcher Close #3574 Signed-off-by: Ekaterina Pavlova --- internal/fakechain/fakechain.go | 13 ++ pkg/config/config.go | 1 + pkg/config/ledger_config.go | 1 + pkg/config/protocol_config.go | 2 + pkg/core/block/header.go | 21 +++ pkg/core/blockchain.go | 13 ++ pkg/core/statesync/module.go | 63 ++++++++- pkg/network/bqueue/queue.go | 7 +- pkg/network/bqueue_adapters.go | 22 ++++ pkg/network/server.go | 84 ++++++++++-- pkg/network/state_sync.go | 4 + pkg/services/blockfetcher/blockfetcher.go | 122 ++++++++++++++---- .../blockfetcher/blockfetcher_test.go | 18 ++- 13 files changed, 327 insertions(+), 44 deletions(-) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index 6639945d0d..45af63683e 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -452,6 +452,9 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error { // NeedHeaders implements the StateSync interface. func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() } +// NeedBlocks implements the StateSync interface. +func (s *FakeStateSync) NeedBlocks() bool { return false } + // NeedMPTNodes implements the StateSync interface. func (s *FakeStateSync) NeedMPTNodes() bool { panic("TODO") @@ -469,3 +472,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node, func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { panic("TODO") } + +// GetConfig implements the StateSync interface. +func (s *FakeStateSync) GetConfig() config.Blockchain { + panic("TODO") +} + +// SetOnStageChanged implements the StateSync interface. +func (s *FakeStateSync) SetOnStageChanged(func()) { + panic("TODO") +} diff --git a/pkg/config/config.go b/pkg/config/config.go index ac8bc5d7be..302316e2ae 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain { return Blockchain{ ProtocolConfiguration: c.ProtocolConfiguration, Ledger: c.ApplicationConfiguration.Ledger, + NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher, } } diff --git a/pkg/config/ledger_config.go b/pkg/config/ledger_config.go index 8b2d1adef1..d7486f56e3 100644 --- a/pkg/config/ledger_config.go +++ b/pkg/config/ledger_config.go @@ -31,4 +31,5 @@ type Ledger struct { type Blockchain struct { ProtocolConfiguration Ledger + NeoFSBlockFetcher } diff --git a/pkg/config/protocol_config.go b/pkg/config/protocol_config.go index c72b8ccdbd..481252f89e 100644 --- a/pkg/config/protocol_config.go +++ b/pkg/config/protocol_config.go @@ -49,6 +49,8 @@ type ( P2PSigExtensions bool `yaml:"P2PSigExtensions"` // P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic. P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"` + // NeoFSStateSyncExtensions enables state data exchange logic via NeoFS. + NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"` // ReservedAttributes allows to have reserved attributes range for experimental or private purposes. ReservedAttributes bool `yaml:"ReservedAttributes"` diff --git a/pkg/core/block/header.go b/pkg/core/block/header.go index b056b0c671..5c5ae03585 100644 --- a/pkg/core/block/header.go +++ b/pkg/core/block/header.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/util" ) @@ -235,3 +236,23 @@ func (b *Header) UnmarshalJSON(data []byte) error { } return nil } + +// GetExpectedHeaderSize returns the expected Header size with the given number of validators. +func GetExpectedHeaderSize(stateRootInHeader bool, numOfValidators int) int { + m := smartcontract.GetDefaultHonestNodeCount(numOfValidators) + // expectedHeaderSizeWithEmptyWitness contains 2 bytes for zero-length (new(Header)).Script.Invocation/Verification + // InvocationScript: + // 64 is the size of the default signature length + 2 bytes length and opcode + // 2 = 1 push opcode + 1 length + // VerifcationScript: + // m = 1 bytes + // 33 = 1 push opcode + 1 length + 33 bytes for public key + // n = 1 bytes + // 5 for SYSCALL + size := expectedHeaderSizeWithEmptyWitness + (1+1+64)*m + 2 + numOfValidators*(1+1+33) + 2 + 5 + + if stateRootInHeader { + size += util.Uint256Size + } + return size +} diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index d4faabe7a8..babb69a392 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -299,6 +299,9 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl log.Info("MaxValidUntilBlockIncrement is not set or wrong, using default value", zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement)) } + if cfg.P2PStateExchangeExtensions && cfg.NeoFSStateSyncExtensions { + return nil, errors.New("P2PStateExchangeExtensions and NeoFSStateSyncExtensions cannot be enabled simultaneously") + } if cfg.P2PStateExchangeExtensions { if !cfg.StateRootInHeader { return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") @@ -312,6 +315,16 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Int("StateSyncInterval", cfg.StateSyncInterval)) } } + if cfg.NeoFSStateSyncExtensions { + if !cfg.NeoFSBlockFetcher.Enabled { + return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off") + } + if cfg.StateSyncInterval <= 0 { + cfg.StateSyncInterval = defaultStateSyncInterval + log.Info("StateSyncInterval is not set or wrong, using default value", + zap.Int("StateSyncInterval", cfg.StateSyncInterval)) + } + } if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks { return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not") } diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 0d26a861f4..b129bf67f0 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -93,11 +93,15 @@ type Module struct { billet *mpt.Billet jumpCallback func(p uint32) error + + // stageChangedCallback is an optional callback that is triggered whenever + // the sync stage changes. + stageChangedCallback func() } // NewModule returns new instance of statesync module. func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module { - if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) { + if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) && !bc.GetConfig().NeoFSStateSyncExtensions { return &Module{ dao: s, bc: bc, @@ -120,7 +124,13 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si // Init initializes state sync module for the current chain's height with given // callback for MPT nodes requests. func (s *Module) Init(currChainHeight uint32) error { + oldStage := s.syncStage s.lock.Lock() + defer func() { + if s.syncStage != oldStage { + s.notifyStageChanged() + } + }() defer s.lock.Unlock() if s.syncStage != none { @@ -176,6 +186,20 @@ func (s *Module) Init(currChainHeight uint32) error { return s.defineSyncStage() } +// SetOnStageChanged sets callback that is triggered whenever the sync stage changes. +func (s *Module) SetOnStageChanged(cb func()) { + s.lock.Lock() + defer s.lock.Unlock() + s.stageChangedCallback = cb +} + +// notifyStageChanged triggers stage change callback if it's set. +func (s *Module) notifyStageChanged() { + if s.stageChangedCallback != nil { + s.stageChangedCallback() + } +} + // TemporaryPrefix accepts current storage prefix and returns prefix // to use for storing intermediate items during synchronization. func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix { @@ -287,7 +311,13 @@ func (s *Module) getLatestSavedBlock(p uint32) uint32 { // AddHeaders validates and adds specified headers to the chain. func (s *Module) AddHeaders(hdrs ...*block.Header) error { + oldStage := s.syncStage s.lock.Lock() + defer func() { + if s.syncStage != oldStage { + s.notifyStageChanged() + } + }() defer s.lock.Unlock() if s.syncStage != initialized { @@ -306,7 +336,13 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error { // AddBlock verifies and saves block skipping executable scripts. func (s *Module) AddBlock(block *block.Block) error { + oldStage := s.syncStage s.lock.Lock() + defer func() { + if s.syncStage != oldStage { + s.notifyStageChanged() + } + }() defer s.lock.Unlock() if s.syncStage&headersSynced == 0 || s.syncStage&blocksSynced != 0 { @@ -359,7 +395,13 @@ func (s *Module) AddBlock(block *block.Block) error { // AddMPTNodes tries to add provided set of MPT nodes to the MPT billet if they are // not yet collected. func (s *Module) AddMPTNodes(nodes [][]byte) error { + oldStage := s.syncStage s.lock.Lock() + defer func() { + if s.syncStage != oldStage { + s.notifyStageChanged() + } + }() defer s.lock.Unlock() if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 { @@ -425,6 +467,12 @@ func (s *Module) restoreNode(n mpt.Node) error { // If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller // should take care of it. func (s *Module) checkSyncIsCompleted() { + oldStage := s.syncStage + defer func() { + if s.syncStage != oldStage { + s.notifyStageChanged() + } + }() if s.syncStage != headersSynced|mptSynced|blocksSynced { return } @@ -484,6 +532,14 @@ func (s *Module) NeedMPTNodes() bool { return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0 } +// NeedBlocks returns whether the module hasn't completed blocks synchronisation. +func (s *Module) NeedBlocks() bool { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0 +} + // Traverse traverses local MPT nodes starting from the specified root down to its // children calling `process` for each serialised node until stop condition is satisfied. func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error { @@ -513,3 +569,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { func (s *Module) HeaderHeight() uint32 { return s.bc.HeaderHeight() } + +// GetConfig returns current blockchain configuration. +func (s *Module) GetConfig() config.Blockchain { + return s.bc.GetConfig() +} diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 38d22f817a..345db58db2 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -28,10 +28,15 @@ const ( // Queueable is an interface for a queue element. type Queueable interface { - GetIndex() uint32 + Indexable comparable } +// Indexable is an interface for an element that has an index. +type Indexable interface { + GetIndex() uint32 +} + // Queue is the queue of queueable elements. type Queue[Q Queueable] struct { log *zap.Logger diff --git a/pkg/network/bqueue_adapters.go b/pkg/network/bqueue_adapters.go index 906978c093..f4f4179739 100644 --- a/pkg/network/bqueue_adapters.go +++ b/pkg/network/bqueue_adapters.go @@ -7,6 +7,7 @@ import ( var ( _ = (bqueue.Queuer[*block.Block])(&stateSyncBlockQueueAdapter{}) + _ = (bqueue.Queuer[*block.Header])(&stateSyncHeaderQueueAdapter{}) _ = (bqueue.Queuer[*block.Block])(&chainBlockQueueAdapter{}) ) @@ -31,6 +32,27 @@ func (s stateSyncBlockQueueAdapter) Height() uint32 { return s.stateSync.BlockHeight() } +// stateSyncHeaderQueueAdapter is a wrapper over StateSync module that +// implements the [bqueue.Queuer] interface for operating with [*block.Header]. +type stateSyncHeaderQueueAdapter struct { + stateSync StateSync +} + +// AddItem implements the [bqueue.Queuer] interface. +func (s stateSyncHeaderQueueAdapter) AddItem(h *block.Header) error { + return s.stateSync.AddHeaders(h) +} + +// AddItems implements the [bqueue.Queuer] interface. +func (s stateSyncHeaderQueueAdapter) AddItems(h ...*block.Header) error { + return s.stateSync.AddHeaders(h...) +} + +// Height implements the [bqueue.Queuer] interface. +func (s stateSyncHeaderQueueAdapter) Height() uint32 { + return s.stateSync.HeaderHeight() +} + // chainBlockQueueAdapter is a wrapper over the [Ledger] interface that // implements the [bqueue.Queuer] interface for operating with [*block.Block]. type chainBlockQueueAdapter struct { diff --git a/pkg/network/server.go b/pkg/network/server.go index be06537f5a..848b4f0c7b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -111,11 +111,15 @@ type ( chain Ledger bQueue *bqueue.Queue[*block.Block] bSyncQueue *bqueue.Queue[*block.Block] + syncHFetcherQueue *bqueue.Queue[*block.Header] + syncBFetcherQueue *bqueue.Queue[*block.Block] bFetcherQueue *bqueue.Queue[*block.Block] mempool *mempool.Pool notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + syncHeaderFetcher *blockfetcher.Service + syncBlockFetcher *blockfetcher.Service blockFetcher *blockfetcher.Service serviceLock sync.RWMutex @@ -234,10 +238,29 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s.bFetcherQueue = bqueue.New[*block.Block](chainBlockQueueAdapter{chain}, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) var err error - s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.Put, sync.OnceFunc(func() { close(s.blockFetcherFin) })) + s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, func(item bqueue.Indexable) error { return s.bFetcherQueue.Put(item.(*block.Block)) }, sync.OnceFunc(func() { close(s.blockFetcherFin) }), blockfetcher.Blocks) if err != nil { return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) } + s.syncHFetcherQueue = bqueue.New[*block.Header](stateSyncHeaderQueueAdapter{s.stateSync}, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.syncHeaderFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, func(item bqueue.Indexable) error { return s.syncHFetcherQueue.Put(item.(*block.Header)) }, + func() { + s.log.Info("NeoFS HeaderFetcher finished state sync headers downloading") + }, blockfetcher.Headers) + if err != nil { + return nil, fmt.Errorf("failed to create Sync NeoFS HeaderFetcher: %w", err) + } + s.syncBFetcherQueue = bqueue.New[*block.Block](stateSyncBlockQueueAdapter{s.stateSync}, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.syncBlockFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, func(item bqueue.Indexable) error { return s.syncBFetcherQueue.Put(item.(*block.Block)) }, + sync.OnceFunc(func() { + s.log.Info("NeoFS BlockFetcher finished state sync block downloading") + }), blockfetcher.Blocks) + if err != nil { + return nil, fmt.Errorf("failed to create Sync NeoFS BlockFetcher: %w", err) + } + if s.config.NeoFSStateSyncExtensions { + s.stateSync.SetOnStageChanged(s.stateSyncCallBack) + } if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -314,10 +337,11 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + go s.syncBFetcherQueue.Run() + go s.syncHFetcherQueue.Run() go s.bFetcherQueue.Run() - if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { - err := s.blockFetcher.Start() - if err != nil { + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled && !s.config.NeoFSStateSyncExtensions && !s.config.P2PStateExchangeExtensions { + if err := s.blockFetcher.Start(); err != nil { s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) } } @@ -335,10 +359,12 @@ func (s *Server) Shutdown() { return } s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) - if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { - s.bFetcherQueue.Discard() - s.blockFetcher.Shutdown() - } + s.syncHFetcherQueue.Discard() + s.syncBFetcherQueue.Discard() + s.bFetcherQueue.Discard() + s.syncHeaderFetcher.Shutdown() + s.syncBlockFetcher.Shutdown() + s.blockFetcher.Shutdown() for _, tr := range s.transports { tr.Close() } @@ -365,6 +391,38 @@ func (s *Server) Shutdown() { _ = s.log.Sync() } +func (s *Server) stateSyncCallBack() { + needHeaders := s.stateSync.NeedHeaders() + needBlocks := s.stateSync.NeedBlocks() + isActive := s.stateSync.IsActive() + if needHeaders { + if !s.syncHeaderFetcher.IsShutdown() { + err := s.syncHeaderFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS Sync HeaderFetcher", zap.Error(err)) + } + } + } + if needBlocks { + s.syncHeaderFetcher.Shutdown() + if !s.syncBlockFetcher.IsShutdown() { + if err := s.syncBlockFetcher.Start(); err != nil { + s.log.Error("skipping NeoFS Sync BlockFetcher", zap.Error(err)) + } + } + } + if !needHeaders && !needBlocks { + s.syncBlockFetcher.Shutdown() + } + if !isActive { + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + if err := s.blockFetcher.Start(); err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } + } +} + // AddService allows to add a service to be started/stopped by Server. func (s *Server) AddService(svc Service) { s.serviceLock.Lock() @@ -796,7 +854,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { - if s.blockFetcher.IsActive() { + if s.syncHeaderFetcher.IsActive() || s.syncBlockFetcher.IsActive() || s.blockFetcher.IsActive() { return nil } if s.stateSync.IsActive() { @@ -819,7 +877,7 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { - if s.blockFetcher.IsActive() { + if s.syncHeaderFetcher.IsActive() || s.syncBlockFetcher.IsActive() || s.blockFetcher.IsActive() { return nil } if s.stateSync.NeedHeaders() { @@ -1140,7 +1198,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleHeadersCmd processes headers payload. func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error { - if s.blockFetcher.IsActive() { + if s.syncHeaderFetcher.IsActive() || s.syncBlockFetcher.IsActive() || s.blockFetcher.IsActive() { return nil } return s.stateSync.AddHeaders(h.Hdrs...) @@ -1477,6 +1535,8 @@ func (s *Server) tryInitStateSync() { } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() + s.syncBFetcherQueue.Discard() + s.syncHFetcherQueue.Discard() return } @@ -1508,6 +1568,8 @@ func (s *Server) tryInitStateSync() { // module can be inactive after init (i.e. full state is collected and ordinary block processing is needed) if !s.stateSync.IsActive() { s.bSyncQueue.Discard() + s.syncBFetcherQueue.Discard() + s.syncHFetcherQueue.Discard() } } } diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index 796da8b874..c20ee3a0df 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -1,6 +1,7 @@ package network import ( + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/util" ) @@ -13,7 +14,10 @@ type StateSync interface { IsActive() bool IsInitialized() bool GetUnknownMPTNodesBatch(limit int) []util.Uint256 + GetConfig() config.Blockchain NeedHeaders() bool + NeedBlocks() bool NeedMPTNodes() bool + SetOnStageChanged(func()) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 854827c6eb..308edeb9c8 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-sdk-go/client" @@ -27,6 +28,17 @@ import ( "go.uber.org/zap" ) +// OperationMode is an enum that denotes the operation mode of the Fetcher. +// It can be either Blocks or Headers. +type OperationMode byte + +const ( + // Blocks denotes that the Fetcher is working with blocks. + Blocks OperationMode = iota + // Headers denotes that the Fetcher is working with headers. + Headers +) + const ( // DefaultQueueCacheSize is the default size of the queue cache. DefaultQueueCacheSize = 16000 @@ -36,6 +48,7 @@ const ( type Ledger interface { GetConfig() config.Blockchain BlockHeight() uint32 + HeaderHeight() uint32 } // poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. @@ -52,15 +65,20 @@ func (p poolWrapper) Close() error { // Service is a service that fetches blocks from NeoFS. type Service struct { // isActive denotes whether the service is working or in the process of shutdown. - isActive atomic.Bool - log *zap.Logger - cfg config.NeoFSBlockFetcher + isActive atomic.Bool + isShutdown atomic.Bool + log *zap.Logger + cfg config.NeoFSBlockFetcher + operationMode OperationMode + stateRootInHeader bool + // headerSize is the size of the header in bytes. + headerSize int - chain Ledger - pool poolWrapper - enqueueBlock func(*block.Block) error - account *wallet.Account + chain Ledger + pool poolWrapper + enqueue func(obj bqueue.Indexable) error + account *wallet.Account oidsCh chan oid.ID // wg is a wait group for block downloaders. @@ -78,10 +96,15 @@ type Service struct { oidDownloaderToExiter chan struct{} shutdownCallback func() + + // Depends on the OperationMode, the following functions are set to the appropriate functions. + getFunc func(ctx context.Context, oid string) (io.ReadCloser, error) + readFunc func(rc io.ReadCloser) (any, error) + heightFunc func() uint32 } // New creates a new BlockFetcher Service. -func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put func(item bqueue.Indexable) error, shutdownCallback func(), opt OperationMode) (*Service, error) { var ( account *wallet.Account err error @@ -137,12 +160,14 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc return nil, err } return &Service{ - chain: chain, - pool: poolWrapper{Pool: p}, - log: logger, - cfg: cfg, - - enqueueBlock: putBlock, + chain: chain, + pool: poolWrapper{Pool: p}, + log: logger, + cfg: cfg, + operationMode: opt, + headerSize: getHeaderSize(chain.GetConfig()), + + enqueue: put, account: account, stateRootInHeader: chain.GetConfig().StateRootInHeader, shutdownCallback: shutdownCallback, @@ -160,8 +185,15 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc }, nil } +func getHeaderSize(chain config.Blockchain) int { + return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0)) +} + // Start runs the NeoFS BlockFetcher service. func (bfs *Service) Start() error { + if bfs.IsShutdown() { + return errors.New("service is already shut down") + } if !bfs.isActive.CompareAndSwap(false, true) { return nil } @@ -196,6 +228,16 @@ func (bfs *Service) Start() error { bfs.isActive.CompareAndSwap(true, false) return fmt.Errorf("container magic mismatch: expected %d, got %s", bfs.chain.GetConfig().Magic, containerMagic) } + + bfs.getFunc = bfs.objectGet + bfs.readFunc = bfs.readBlock + bfs.heightFunc = bfs.chain.BlockHeight + if bfs.operationMode == Headers { + bfs.getFunc = bfs.objectGetRange + bfs.readFunc = bfs.readHeader + bfs.heightFunc = bfs.chain.HeaderHeight + } + // Start routine that manages Service shutdown process. go bfs.exiter() @@ -239,22 +281,22 @@ func (bfs *Service) blockDownloader() { ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer cancel() - rc, err := bfs.objectGet(ctx, blkOid.String()) + rc, err := bfs.getFunc(ctx, blkOid.String()) if err != nil { if isContextCanceledErr(err) { return } - bfs.log.Error("failed to objectGet block", zap.String("oid", blkOid.String()), zap.Error(err)) + bfs.log.Error("failed to get object", zap.String("oid", blkOid.String()), zap.Error(err)) bfs.stopService(true) return } - b, err := bfs.readBlock(rc) + obj, err := bfs.readFunc(rc) if err != nil { if isContextCanceledErr(err) { return } - bfs.log.Error("failed to decode block from stream", zap.String("oid", blkOid.String()), zap.Error(err)) + bfs.log.Error("failed to decode object from stream", zap.String("oid", blkOid.String()), zap.Error(err)) bfs.stopService(true) return } @@ -262,9 +304,9 @@ func (bfs *Service) blockDownloader() { case <-bfs.ctx.Done(): return default: - err = bfs.enqueueBlock(b) + err = bfs.enqueue(obj.(bqueue.Indexable)) if err != nil { - bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err)) + bfs.log.Error("failed to enqueue object", zap.String("oid", blkOid.String()), zap.Error(err)) bfs.stopService(true) return } @@ -274,7 +316,7 @@ func (bfs *Service) blockDownloader() { // fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. func (bfs *Service) fetchOIDsFromIndexFiles() error { - h := bfs.chain.BlockHeight() + h := bfs.heightFunc() startIndex := h / bfs.cfg.IndexFileSize skip := h % bfs.cfg.IndexFileSize @@ -368,7 +410,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { // fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects. func (bfs *Service) fetchOIDsBySearch() error { - startIndex := bfs.chain.BlockHeight() + startIndex := bfs.heightFunc() //We need to search with EQ filter to avoid partially-completed SEARCH responses. batchSize := uint32(neofs.DefaultSearchBatchSize) @@ -413,7 +455,7 @@ func (bfs *Service) fetchOIDsBySearch() error { } // readBlock decodes the block from the read closer and prepares it for adding to the blockchain. -func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) { +func (bfs *Service) readBlock(rc io.ReadCloser) (any, error) { b := block.New(bfs.stateRootInHeader) r := gio.NewBinReaderFromIO(rc) b.DecodeBinary(r) @@ -421,11 +463,20 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) { return b, r.Err } +// readHeader decodes the header from the read closer and prepares it for adding to the blockchain. +func (bfs *Service) readHeader(rc io.ReadCloser) (any, error) { + b := block.New(bfs.stateRootInHeader) + r := gio.NewBinReaderFromIO(rc) + b.Header.DecodeBinary(r) + rc.Close() + return &b.Header, r.Err +} + // Shutdown stops the NeoFS BlockFetcher service. It prevents service from new // block OIDs search, cancels all in-progress downloading operations and waits // until all service routines finish their work. func (bfs *Service) Shutdown() { - if !bfs.IsActive() { + if !bfs.IsActive() || bfs.IsShutdown() { return } bfs.stopService(true) @@ -444,6 +495,9 @@ func (bfs *Service) stopService(force bool) { // exiter is a routine that is listening to a quitting signal and manages graceful // Service shutdown process. func (bfs *Service) exiter() { + if !bfs.isActive.Load() { + return + } // Closing signal may come from anyone, but only once. force := <-bfs.quit bfs.log.Info("shutting down NeoFS BlockFetcher service", @@ -451,6 +505,7 @@ func (bfs *Service) exiter() { ) bfs.isActive.CompareAndSwap(true, false) + bfs.isShutdown.CompareAndSwap(false, true) // Cansel all pending OIDs/blocks downloads in case if shutdown requested by user // or caused by downloading error. if force { @@ -477,6 +532,12 @@ func (bfs *Service) exiter() { close(bfs.exiterToShutdown) } +// IsShutdown returns true if the NeoFS BlockFetcher service is completely shutdown. +// The service can not be started again. +func (bfs *Service) IsShutdown() bool { + return bfs.isShutdown.Load() +} + // IsActive returns true if the NeoFS BlockFetcher service is running. func (bfs *Service) IsActive() bool { return bfs.isActive.Load() @@ -533,6 +594,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e return rc, err } +func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) { + u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize)) + if err != nil { + return nil, err + } + var rc io.ReadCloser + err = bfs.retry(func() error { + rc, err = neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false) + return err + }) + return rc, err +} + func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { var ( oids []oid.ID diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index 970a5d456e..5dc1b43a9d 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/nspcc-dev/neo-go/pkg/config" - "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -14,6 +14,10 @@ type mockLedger struct { height uint32 } +func (m *mockLedger) HeaderHeight() uint32 { + return m.height +} + func (m *mockLedger) GetConfig() config.Blockchain { return config.Blockchain{} } @@ -26,7 +30,7 @@ type mockPutBlockFunc struct { putCalled bool } -func (m *mockPutBlockFunc) putBlock(b *block.Block) error { +func (m *mockPutBlockFunc) putBlock(b bqueue.Indexable) error { m.putCalled = true return nil } @@ -46,7 +50,7 @@ func TestServiceConstructor(t *testing.T) { OIDBatchSize: 0, DownloaderWorkersCount: 0, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback, Blocks) require.Error(t, err) }) @@ -57,7 +61,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{}, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback, Blocks) require.Error(t, err) }) @@ -69,7 +73,7 @@ func TestServiceConstructor(t *testing.T) { Addresses: []string{"localhost:8080"}, BQueueSize: DefaultQueueCacheSize, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback, Blocks) require.NoError(t, err) require.NotNil(t, service) @@ -87,7 +91,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{"localhost:1"}, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback, Blocks) require.NoError(t, err) err = service.Start() require.Error(t, err) @@ -106,7 +110,7 @@ func TestServiceConstructor(t *testing.T) { }, }, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback, Blocks) require.Error(t, err) require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:") }) From 1ad62ef5f513019a150c6182a3c852f883f1c84a Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 29 Jan 2025 20:00:15 +0300 Subject: [PATCH 3/3] blockfetcher: add headerSizeMap to GetRange headers accordingly Signed-off-by: Ekaterina Pavlova --- pkg/services/blockfetcher/blockfetcher.go | 61 ++++++++++++++++------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 308edeb9c8..daccec2998 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -62,6 +62,11 @@ func (p poolWrapper) Close() error { return nil } +type indexedOID struct { + Index int + OID oid.ID +} + // Service is a service that fetches blocks from NeoFS. type Service struct { // isActive denotes whether the service is working or in the process of shutdown. @@ -72,15 +77,15 @@ type Service struct { operationMode OperationMode stateRootInHeader bool - // headerSize is the size of the header in bytes. - headerSize int + // headerSizeMap is a map of height to expected header size. + headerSizeMap map[int]int chain Ledger pool poolWrapper enqueue func(obj bqueue.Indexable) error account *wallet.Account - oidsCh chan oid.ID + oidsCh chan indexedOID // wg is a wait group for block downloaders. wg sync.WaitGroup @@ -98,7 +103,7 @@ type Service struct { shutdownCallback func() // Depends on the OperationMode, the following functions are set to the appropriate functions. - getFunc func(ctx context.Context, oid string) (io.ReadCloser, error) + getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error) readFunc func(rc io.ReadCloser) (any, error) heightFunc func() uint32 } @@ -165,7 +170,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun log: logger, cfg: cfg, operationMode: opt, - headerSize: getHeaderSize(chain.GetConfig()), + headerSizeMap: getHeaderSizeMap(chain.GetConfig()), enqueue: put, account: account, @@ -181,12 +186,17 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun // * first full block of OIDs is processing by Downloader // * second full block of OIDs is available to be fetched by Downloader immediately // * third half-filled block of OIDs is being collected by OIDsFetcher. - oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize), + oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize), }, nil } -func getHeaderSize(chain config.Blockchain) int { - return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0)) +func getHeaderSizeMap(chain config.Blockchain) map[int]int { + headerSizeMap := make(map[int]int) + headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0)) + for height := range chain.CommitteeHistory { + headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height)) + } + return headerSizeMap } // Start runs the NeoFS BlockFetcher service. @@ -277,11 +287,13 @@ func (bfs *Service) oidDownloader() { func (bfs *Service) blockDownloader() { defer bfs.wg.Done() - for blkOid := range bfs.oidsCh { + for indexedOid := range bfs.oidsCh { + index := indexedOid.Index + blkOid := indexedOid.OID ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer cancel() - rc, err := bfs.getFunc(ctx, blkOid.String()) + rc, err := bfs.getFunc(ctx, blkOid.String(), index) if err != nil { if isContextCanceledErr(err) { return @@ -347,7 +359,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer blockCancel() - oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1) if err != nil { if isContextCanceledErr(err) { return nil @@ -355,7 +367,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) } - err = bfs.streamBlockOIDs(oidsRC, int(skip)) + err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip)) if err != nil { if isContextCanceledErr(err) { return nil @@ -370,7 +382,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { } // streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. -func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { +func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error { defer rc.Close() oidBytes := make([]byte, oid.Size) oidsProcessed := 0 @@ -397,7 +409,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { select { case <-bfs.exiterToOIDDownloader: return nil - case bfs.oidsCh <- oidBlock: + case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}: } oidsProcessed++ @@ -442,12 +454,14 @@ func (bfs *Service) fetchOIDsBySearch() error { bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex)) return nil } + index := int(startIndex) for _, oid := range blockOids { select { case <-bfs.exiterToOIDDownloader: return nil - case bfs.oidsCh <- oid: + case bfs.oidsCh <- indexedOID{Index: index, OID: oid}: } + index++ //Won't work properly if neofs.ObjectSearch results are not ordered. } startIndex += batchSize } @@ -581,7 +595,7 @@ func (bfs *Service) retry(action func() error) error { return err } -func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { +func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) { u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid)) if err != nil { return nil, err @@ -594,8 +608,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e return rc, err } -func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) { - u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize)) +func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) { + nearestHeight := 0 + for h := range bfs.headerSizeMap { + if h <= height && h > nearestHeight { + nearestHeight = h + } + if nearestHeight >= height { + break + } + } + + size := bfs.headerSizeMap[nearestHeight] + u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size)) if err != nil { return nil, err }