diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index a2553b61a1..0833b2702a 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -428,6 +428,11 @@ func (s *FakeStateSync) BlockHeight() uint32 { return 0 } +// HeaderHeight implements the StateSync interface. +func (s *FakeStateSync) HeaderHeight() uint32 { + return 0 +} + // IsActive implements the StateSync interface. func (s *FakeStateSync) IsActive() bool { return s.IsActiveFlag.Load() } @@ -447,6 +452,8 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error { // NeedHeaders implements the StateSync interface. func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() } +func (s *FakeStateSync) NeedBlocks() bool { return false } + // NeedMPTNodes implements the StateSync interface. func (s *FakeStateSync) NeedMPTNodes() bool { panic("TODO") @@ -464,3 +471,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node, func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { panic("TODO") } + +// GetConfig implements the StateSync interface. +func (s *FakeStateSync) GetConfig() config.Blockchain { + panic("TODO") +} + +// SetOnStageChanged implements the StateSync interface. +func (s *FakeStateSync) SetOnStageChanged(func()) { + panic("TODO") +} diff --git a/pkg/config/config.go b/pkg/config/config.go index ac8bc5d7be..302316e2ae 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain { return Blockchain{ ProtocolConfiguration: c.ProtocolConfiguration, Ledger: c.ApplicationConfiguration.Ledger, + NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher, } } diff --git a/pkg/config/ledger_config.go b/pkg/config/ledger_config.go index ff6d59948a..98e6d2f92c 100644 --- a/pkg/config/ledger_config.go +++ b/pkg/config/ledger_config.go @@ -29,4 +29,5 @@ type Ledger struct { type Blockchain struct { ProtocolConfiguration Ledger + NeoFSBlockFetcher } diff --git a/pkg/config/protocol_config.go b/pkg/config/protocol_config.go index c72b8ccdbd..5720ce49bc 100644 --- a/pkg/config/protocol_config.go +++ b/pkg/config/protocol_config.go @@ -49,6 +49,8 @@ type ( P2PSigExtensions bool `yaml:"P2PSigExtensions"` // P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic. P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"` + // NeoFSStateSyncExtensions enables additional MPT state data exchange logic via NeoFS. + NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"` // ReservedAttributes allows to have reserved attributes range for experimental or private purposes. ReservedAttributes bool `yaml:"ReservedAttributes"` diff --git a/pkg/core/block/header.go b/pkg/core/block/header.go index dc606cb28c..63c7925b58 100644 --- a/pkg/core/block/header.go +++ b/pkg/core/block/header.go @@ -85,6 +85,16 @@ func (b *Header) GetIndex() uint32 { return b.Index } +// GetExpectedHeaderSize returns the expected header size with empty witness. +func (b *Header) GetExpectedHeaderSize() int { + size := expectedHeaderSizeWithEmptyWitness - 1 - 1 + // 1 is for the zero-length (new(Header)).Script.Invocation/Verification + io.GetVarSize(&b.Script) + if b.StateRootEnabled { + size += util.Uint256Size + } + return size +} + // Hash returns the hash of the block. Notice that it is cached internally, // so no matter how you change the [Header] after the first invocation of this // method it won't change. To get an updated hash in case you're changing diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 97549b80ef..1c410bf1cf 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -286,8 +286,8 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement)) } if cfg.P2PStateExchangeExtensions { - if !cfg.StateRootInHeader { - return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") + if !cfg.StateRootInHeader && !cfg.NeoFSStateSyncExtensions { + return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader or NeoFSStateSyncExtensions are off") } if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks { return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)") diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index 7f71a57905..8f26ab984d 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -163,7 +163,7 @@ func TestNewBlockchainIncosistencies(t *testing.T) { t.Run("state exchange without state root", func(t *testing.T) { checkNewBlockchainErr(t, func(c *config.Config) { c.ProtocolConfiguration.P2PStateExchangeExtensions = true - }, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") + }, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader or NeoFSStateSyncExtensions are off") }) } diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index e377071c43..95f328c159 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -94,6 +94,10 @@ type Module struct { billet *mpt.Billet jumpCallback func(p uint32) error + + // stageChangedCallback is an optional callback that is triggered whenever + // the sync stage changes. + stageChangedCallback func() } func (s *Module) AddItem(b bqueue.Queueable) error { @@ -139,9 +143,14 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si // Init initializes state sync module for the current chain's height with given // callback for MPT nodes requests. func (s *Module) Init(currChainHeight uint32) error { + oldStage := s.syncStage s.lock.Lock() defer s.lock.Unlock() - + defer func() { + if s.syncStage != oldStage { + go s.notifyStageChanged() + } + }() if s.syncStage != none { return errors.New("already initialized or inactive") } @@ -195,6 +204,20 @@ func (s *Module) Init(currChainHeight uint32) error { return s.defineSyncStage() } +// SetOnStageChanged sets callback that is triggered whenever the sync stage changes. +func (s *Module) SetOnStageChanged(cb func()) { + s.lock.Lock() + defer s.lock.Unlock() + s.stageChangedCallback = cb +} + +// notifyStageChanged triggers stage callback if it's set. +func (s *Module) notifyStageChanged() { + if s.stageChangedCallback != nil { + s.stageChangedCallback() + } +} + // TemporaryPrefix accepts current storage prefix and returns prefix // to use for storing intermediate items during synchronization. func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix { @@ -211,6 +234,12 @@ func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix { // defineSyncStage sequentially checks and sets sync state process stage after Module // initialization. It also performs initialization of MPT Billet if necessary. func (s *Module) defineSyncStage() error { + oldStage := s.syncStage + defer func() { + if s.syncStage != oldStage { + go s.notifyStageChanged() + } + }() // check headers sync stage first ltstHeaderHeight := s.bc.HeaderHeight() if ltstHeaderHeight > s.syncPoint { @@ -330,6 +359,7 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error { // AddBlock verifies and saves block skipping executable scripts. func (s *Module) AddBlock(block *block.Block) error { + oldStage := s.syncStage s.lock.Lock() defer s.lock.Unlock() @@ -375,6 +405,9 @@ func (s *Module) AddBlock(block *block.Block) error { s.syncStage |= blocksSynced s.log.Info("blocks are in sync", zap.Uint32("blockHeight", s.blockHeight)) + if s.syncStage != oldStage { + go s.notifyStageChanged() + } s.checkSyncIsCompleted() } return nil @@ -385,6 +418,7 @@ func (s *Module) AddBlock(block *block.Block) error { func (s *Module) AddMPTNodes(nodes [][]byte) error { s.lock.Lock() defer s.lock.Unlock() + oldStage := s.syncStage if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 { return errors.New("MPT nodes were not requested") @@ -406,6 +440,9 @@ func (s *Module) AddMPTNodes(nodes [][]byte) error { s.syncStage |= mptSynced s.log.Info("MPT is in sync", zap.Uint32("height", s.syncPoint)) + if s.syncStage != oldStage { + go s.notifyStageChanged() + } s.checkSyncIsCompleted() } return nil @@ -449,6 +486,12 @@ func (s *Module) restoreNode(n mpt.Node) error { // If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller // should take care of it. func (s *Module) checkSyncIsCompleted() { + oldStage := s.syncStage + defer func() { + if s.syncStage != oldStage { + go s.notifyStageChanged() + } + }() if s.syncStage != headersSynced|mptSynced|blocksSynced { return } @@ -474,6 +517,14 @@ func (s *Module) BlockHeight() uint32 { return s.blockHeight } +// HeaderHeight returns index of the last stored header. +func (s *Module) HeaderHeight() uint32 { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.bc.HeaderHeight() +} + // IsActive tells whether state sync module is on and still gathering state // synchronisation data (headers, blocks or MPT nodes). func (s *Module) IsActive() bool { @@ -508,6 +559,14 @@ func (s *Module) NeedMPTNodes() bool { return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0 } +// NeedBlocks returns whether the module hasn't completed blocks synchronisation. +func (s *Module) NeedBlocks() bool { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0 +} + // Traverse traverses local MPT nodes starting from the specified root down to its // children calling `process` for each serialised node until stop condition is satisfied. func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error { @@ -533,7 +592,7 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { return s.mptpool.GetBatch(limit) } -// HeaderHeight returns the height of the latest header. -func (s *Module) HeaderHeight() uint32 { - return s.bc.HeaderHeight() +// GetConfig returns current blockchain configuration. +func (s *Module) GetConfig() config.Blockchain { + return s.bc.GetConfig() } diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index f98ea8469f..629d8e8c50 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -1,6 +1,7 @@ package network import ( + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" @@ -15,6 +16,9 @@ type StateSync interface { IsInitialized() bool GetUnknownMPTNodesBatch(limit int) []util.Uint256 NeedHeaders() bool + NeedBlocks() bool NeedMPTNodes() bool Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error + GetConfig() config.Blockchain + SetOnStageChanged(func()) } diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index 970a5d456e..4ba9394f00 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/nspcc-dev/neo-go/pkg/config" - "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -14,6 +14,10 @@ type mockLedger struct { height uint32 } +func (m *mockLedger) HeaderHeight() uint32 { + return m.height +} + func (m *mockLedger) GetConfig() config.Blockchain { return config.Blockchain{} } @@ -26,7 +30,7 @@ type mockPutBlockFunc struct { putCalled bool } -func (m *mockPutBlockFunc) putBlock(b *block.Block) error { +func (m *mockPutBlockFunc) put(b any) error { m.putCalled = true return nil } @@ -46,7 +50,7 @@ func TestServiceConstructor(t *testing.T) { OIDBatchSize: 0, DownloaderWorkersCount: 0, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks) require.Error(t, err) }) @@ -57,7 +61,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{}, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks) require.Error(t, err) }) @@ -69,7 +73,7 @@ func TestServiceConstructor(t *testing.T) { Addresses: []string{"localhost:8080"}, BQueueSize: DefaultQueueCacheSize, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks) require.NoError(t, err) require.NotNil(t, service) @@ -87,7 +91,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{"localhost:1"}, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks) require.NoError(t, err) err = service.Start() require.Error(t, err) @@ -106,7 +110,7 @@ func TestServiceConstructor(t *testing.T) { }, }, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.put, shutdownCallback, bqueue.Blocks) require.Error(t, err) require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:") })