Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
Close #3574

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 28, 2025
1 parent 44b29e6 commit 5e284a3
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 14 deletions.
17 changes: 17 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ func (s *FakeStateSync) BlockHeight() uint32 {
return 0
}

// HeaderHeight implements the StateSync interface.
func (s *FakeStateSync) HeaderHeight() uint32 {
return 0
}

// IsActive implements the StateSync interface.
func (s *FakeStateSync) IsActive() bool { return s.IsActiveFlag.Load() }

Expand All @@ -447,6 +452,8 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
// NeedHeaders implements the StateSync interface.
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }

func (s *FakeStateSync) NeedBlocks() bool { return false }

// NeedMPTNodes implements the StateSync interface.
func (s *FakeStateSync) NeedMPTNodes() bool {
panic("TODO")
Expand All @@ -464,3 +471,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}

// SetOnStageChanged implements the StateSync interface.
func (s *FakeStateSync) SetOnStageChanged(func()) {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
return Blockchain{
ProtocolConfiguration: c.ProtocolConfiguration,
Ledger: c.ApplicationConfiguration.Ledger,
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Ledger struct {
type Blockchain struct {
ProtocolConfiguration
Ledger
NeoFSBlockFetcher
}
2 changes: 2 additions & 0 deletions pkg/config/protocol_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
// NeoFSStateSyncExtensions enables additional MPT state data exchange logic via NeoFS.
NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"`
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
ReservedAttributes bool `yaml:"ReservedAttributes"`

Expand Down
10 changes: 10 additions & 0 deletions pkg/core/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func (b *Header) GetIndex() uint32 {
return b.Index
}

// GetExpectedHeaderSize returns the expected header size with empty witness.
func (b *Header) GetExpectedHeaderSize() int {
size := expectedHeaderSizeWithEmptyWitness - 1 - 1 + // 1 is for the zero-length (new(Header)).Script.Invocation/Verification
io.GetVarSize(&b.Script)
if b.StateRootEnabled {
size += util.Uint256Size
}
return size
}

// Hash returns the hash of the block. Notice that it is cached internally,
// so no matter how you change the [Header] after the first invocation of this
// method it won't change. To get an updated hash in case you're changing
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
if !cfg.StateRootInHeader && !cfg.NeoFSStateSyncExtensions {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader or NeoFSStateSyncExtensions are off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/blockchain_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestNewBlockchainIncosistencies(t *testing.T) {
t.Run("state exchange without state root", func(t *testing.T) {
checkNewBlockchainErr(t, func(c *config.Config) {
c.ProtocolConfiguration.P2PStateExchangeExtensions = true
}, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader or NeoFSStateSyncExtensions are off")
})
}

Expand Down
67 changes: 63 additions & 4 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type Module struct {
billet *mpt.Billet

jumpCallback func(p uint32) error

// stageChangedCallback is an optional callback that is triggered whenever
// the sync stage changes.
stageChangedCallback func()
}

func (s *Module) AddItem(b bqueue.Queueable) error {
Expand Down Expand Up @@ -139,9 +143,14 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si
// Init initializes state sync module for the current chain's height with given
// callback for MPT nodes requests.
func (s *Module) Init(currChainHeight uint32) error {
oldStage := s.syncStage
s.lock.Lock()
defer s.lock.Unlock()

defer func() {
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
}()
if s.syncStage != none {
return errors.New("already initialized or inactive")
}
Expand Down Expand Up @@ -195,6 +204,20 @@ func (s *Module) Init(currChainHeight uint32) error {
return s.defineSyncStage()
}

// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
func (s *Module) SetOnStageChanged(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()
s.stageChangedCallback = cb
}

// notifyStageChanged triggers stage callback if it's set.
func (s *Module) notifyStageChanged() {
if s.stageChangedCallback != nil {
s.stageChangedCallback()
}
}

// TemporaryPrefix accepts current storage prefix and returns prefix
// to use for storing intermediate items during synchronization.
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
Expand All @@ -211,6 +234,12 @@ func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
// defineSyncStage sequentially checks and sets sync state process stage after Module
// initialization. It also performs initialization of MPT Billet if necessary.
func (s *Module) defineSyncStage() error {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
}()
// check headers sync stage first
ltstHeaderHeight := s.bc.HeaderHeight()
if ltstHeaderHeight > s.syncPoint {
Expand Down Expand Up @@ -330,6 +359,7 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error {

// AddBlock verifies and saves block skipping executable scripts.
func (s *Module) AddBlock(block *block.Block) error {
oldStage := s.syncStage
s.lock.Lock()
defer s.lock.Unlock()

Expand Down Expand Up @@ -375,6 +405,9 @@ func (s *Module) AddBlock(block *block.Block) error {
s.syncStage |= blocksSynced
s.log.Info("blocks are in sync",
zap.Uint32("blockHeight", s.blockHeight))
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
s.checkSyncIsCompleted()
}
return nil
Expand All @@ -385,6 +418,7 @@ func (s *Module) AddBlock(block *block.Block) error {
func (s *Module) AddMPTNodes(nodes [][]byte) error {
s.lock.Lock()
defer s.lock.Unlock()
oldStage := s.syncStage

if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
return errors.New("MPT nodes were not requested")
Expand All @@ -406,6 +440,9 @@ func (s *Module) AddMPTNodes(nodes [][]byte) error {
s.syncStage |= mptSynced
s.log.Info("MPT is in sync",
zap.Uint32("height", s.syncPoint))
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
s.checkSyncIsCompleted()
}
return nil
Expand Down Expand Up @@ -449,6 +486,12 @@ func (s *Module) restoreNode(n mpt.Node) error {
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
// should take care of it.
func (s *Module) checkSyncIsCompleted() {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
go s.notifyStageChanged()
}
}()
if s.syncStage != headersSynced|mptSynced|blocksSynced {
return
}
Expand All @@ -474,6 +517,14 @@ func (s *Module) BlockHeight() uint32 {
return s.blockHeight
}

// HeaderHeight returns index of the last stored header.
func (s *Module) HeaderHeight() uint32 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.bc.HeaderHeight()
}

// IsActive tells whether state sync module is on and still gathering state
// synchronisation data (headers, blocks or MPT nodes).
func (s *Module) IsActive() bool {
Expand Down Expand Up @@ -508,6 +559,14 @@ func (s *Module) NeedMPTNodes() bool {
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
}

// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
func (s *Module) NeedBlocks() bool {
s.lock.RLock()
defer s.lock.RUnlock()

return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0
}

// Traverse traverses local MPT nodes starting from the specified root down to its
// children calling `process` for each serialised node until stop condition is satisfied.
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
Expand All @@ -533,7 +592,7 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
return s.mptpool.GetBatch(limit)
}

// HeaderHeight returns the height of the latest header.
func (s *Module) HeaderHeight() uint32 {
return s.bc.HeaderHeight()
// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()
}
4 changes: 4 additions & 0 deletions pkg/network/state_sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand All @@ -15,6 +16,9 @@ type StateSync interface {
IsInitialized() bool
GetUnknownMPTNodesBatch(limit int) []util.Uint256
NeedHeaders() bool
NeedBlocks() bool
NeedMPTNodes() bool
Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
GetConfig() config.Blockchain
SetOnStageChanged(func())
}
18 changes: 11 additions & 7 deletions pkg/services/blockfetcher/blockfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand All @@ -14,6 +14,10 @@ type mockLedger struct {
height uint32
}

func (m *mockLedger) HeaderHeight() uint32 {
return m.height
}

func (m *mockLedger) GetConfig() config.Blockchain {
return config.Blockchain{}
}
Expand All @@ -26,7 +30,7 @@ type mockPutBlockFunc struct {
putCalled bool
}

func (m *mockPutBlockFunc) putBlock(b *block.Block) error {
func (m *mockPutBlockFunc) put(b any) error {
m.putCalled = true
return nil
}
Expand All @@ -46,7 +50,7 @@ func TestServiceConstructor(t *testing.T) {
OIDBatchSize: 0,
DownloaderWorkersCount: 0,
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
_, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks)
require.Error(t, err)
})

Expand All @@ -57,7 +61,7 @@ func TestServiceConstructor(t *testing.T) {
},
Addresses: []string{},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
_, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks)
require.Error(t, err)
})

Expand All @@ -69,7 +73,7 @@ func TestServiceConstructor(t *testing.T) {
Addresses: []string{"localhost:8080"},
BQueueSize: DefaultQueueCacheSize,
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
service, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks)
require.NoError(t, err)
require.NotNil(t, service)

Expand All @@ -87,7 +91,7 @@ func TestServiceConstructor(t *testing.T) {
},
Addresses: []string{"localhost:1"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
service, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks)
require.NoError(t, err)
err = service.Start()
require.Error(t, err)
Expand All @@ -106,7 +110,7 @@ func TestServiceConstructor(t *testing.T) {
},
},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
_, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks)
require.Error(t, err)
require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:")
})
Expand Down

0 comments on commit 5e284a3

Please sign in to comment.