diff --git a/go/common/host/host.go b/go/common/host/host.go index bc733e923a..16a1e8ab1c 100644 --- a/go/common/host/host.go +++ b/go/common/host/host.go @@ -28,8 +28,6 @@ type Host interface { // HealthCheck returns the health status of the host + enclave + db HealthCheck() (*HealthCheck, error) - - P2PTxHandler } type BlockStream struct { diff --git a/go/common/host/identity.go b/go/common/host/identity.go index 75430839b9..6e5cf5cef5 100644 --- a/go/common/host/identity.go +++ b/go/common/host/identity.go @@ -1,15 +1,23 @@ package host -import gethcommon "github.com/ethereum/go-ethereum/common" +import ( + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/config" +) type Identity struct { ID gethcommon.Address P2PPublicAddress string + IsGenesis bool + IsSequencer bool } -func NewIdentity(id gethcommon.Address, p2pPublicAddress string) Identity { +func NewIdentity(cfg *config.HostConfig) Identity { return Identity{ - ID: id, - P2PPublicAddress: p2pPublicAddress, + ID: cfg.ID, + P2PPublicAddress: cfg.P2PPublicAddress, + IsGenesis: cfg.IsGenesis, + IsSequencer: cfg.NodeType == common.Sequencer, } } diff --git a/go/common/host/services.go b/go/common/host/services.go index d8881122e5..53b2d06825 100644 --- a/go/common/host/services.go +++ b/go/common/host/services.go @@ -3,6 +3,9 @@ package host import ( "math/big" + "github.com/ethereum/go-ethereum/rpc" + "github.com/obscuronet/go-obscuro/go/responses" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/obscuronet/go-obscuro/go/common" @@ -11,10 +14,12 @@ import ( // service names - these are the keys used to register known services with the host const ( - P2PName = "p2p" - L1BlockRepositoryName = "l1-block-repo" - L1PublisherName = "l1-publisher" - L2BatchRepositoryName = "l2-batch-repo" + P2PName = "p2p" + L1BlockRepositoryName = "l1-block-repo" + L1PublisherName = "l1-publisher" + L2BatchRepositoryName = "l2-batch-repo" + EnclaveServiceName = "enclaves" + LogSubscriptionServiceName = "log-subs" ) // The host has a number of services that encapsulate the various responsibilities of the host. @@ -22,6 +27,8 @@ const ( // should depend on these interfaces rather than the concrete implementations. // Service interface allows the host to manage all services in a generic way +// Note: Services may depend on other services but they shouldn't use them during construction, only when 'Start()' is called. +// They should be resilient to services availability, because the construction ordering is not guaranteed. type Service interface { Start() error Stop() error @@ -48,8 +55,8 @@ type P2P interface { // todo (@matt) feels a bit weird to have this in this interface since it relates to serving data rather than receiving SubscribeForBatchRequests(handler P2PBatchRequestHandler) func() - // UpdatePeerList allows the host to notify the p2p service of a change in the peer list - UpdatePeerList([]string) + // RefreshPeerList notifies the P2P service that its peer list might be out-of-date and it should resync + RefreshPeerList() } // P2PBatchHandler is an interface for receiving new batches from the P2P network as they arrive @@ -77,7 +84,7 @@ type L1BlockRepository interface { FetchBlockByHeight(height *big.Int) (*types.Block, error) // FetchNextBlock returns the next canonical block after a given block hash // It returns the new block, a bool which is true if the block is the current L1 head and a bool if the block is on a different fork to prevBlock - FetchNextBlock(prevBlock gethcommon.Hash) (*types.Block, bool, bool, error) + FetchNextBlock(prevBlock gethcommon.Hash) (*types.Block, bool, error) // FetchReceipts returns the receipts for a given L1 block FetchReceipts(block *common.L1Block) types.Receipts } @@ -133,4 +140,20 @@ type EnclaveService interface { // LookupBatchBySeqNo is used to fetch batch data from the enclave - it is only used as a fallback for the sequencer // host if it's missing a batch (other host services should use L2Repo to fetch batch data) LookupBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) + + // GetEnclaveClient returns an enclave client // todo (@matt) we probably don't want to expose this + GetEnclaveClient() common.Enclave + + // SubmitAndBroadcastTx submits an encrypted transaction to the enclave, and broadcasts it to other hosts on the network (in particular, to the sequencer) + SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSendRawTx) (*responses.RawTx, error) + + Subscribe(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription) error + Unsubscribe(id rpc.ID) error +} + +// LogSubscriptionManager provides an interface for the host to manage log subscriptions +type LogSubscriptionManager interface { + Subscribe(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error + Unsubscribe(id rpc.ID) + SendLogsToSubscribers(result *common.EncryptedSubscriptionLogs) } diff --git a/go/common/types.go b/go/common/types.go index 0f75789650..f3ab0d10b0 100644 --- a/go/common/types.go +++ b/go/common/types.go @@ -69,6 +69,7 @@ type ( const ( L2GenesisHeight = uint64(0) L1GenesisHeight = uint64(0) + L2GenesisSeqNo = uint64(1) // HeightCommittedBlocks is the number of blocks deep a transaction must be to be considered safe from reorganisations. HeightCommittedBlocks = 15 ) diff --git a/go/enclave/components/batch_executor.go b/go/enclave/components/batch_executor.go index 6699d41646..f1c6cc4655 100644 --- a/go/enclave/components/batch_executor.go +++ b/go/enclave/components/batch_executor.go @@ -88,7 +88,8 @@ func (executor *batchExecutor) ComputeBatch(context *BatchExecutionContext) (*Co } var messages common.CrossChainMessages - if context.SequencerNo.Int64() > 1 { + // Cross chain data is not accessible until one after the genesis batch + if context.SequencerNo.Int64() > int64(common.L2GenesisSeqNo+1) { messages = executor.crossChainProcessors.Local.RetrieveInboundMessages(parentBlock, block, stateDB) } crossChainTransactions := executor.crossChainProcessors.Local.CreateSyntheticTransactions(messages, stateDB) @@ -186,7 +187,7 @@ func (executor *batchExecutor) CreateGenesisState(blkHash common.L1BlockHash, ti Root: *preFundGenesisState, TxHash: types.EmptyRootHash, Number: big.NewInt(int64(0)), - SequencerOrderNo: big.NewInt(int64(0)), + SequencerOrderNo: big.NewInt(int64(common.L2GenesisSeqNo)), // genesis batch has seq number 1 ReceiptHash: types.EmptyRootHash, Time: timeNow, }, diff --git a/go/enclave/components/rollup_producer.go b/go/enclave/components/rollup_producer.go index 9049bd4c3d..0caa526a69 100644 --- a/go/enclave/components/rollup_producer.go +++ b/go/enclave/components/rollup_producer.go @@ -54,7 +54,7 @@ func NewRollupProducer(sequencerID gethcommon.Address, transactionBlobCrypto cry func (re *rollupProducerImpl) CreateRollup(fromBatchNo uint64, limiter limiters.RollupLimiter) (*core.Rollup, error) { batches, err := re.batchRegistry.BatchesAfter(fromBatchNo, limiter) if err != nil { - return nil, err + return nil, fmt.Errorf("could not fetch 'from' batch (seqNo=%d) for rollup: %w", fromBatchNo, err) } hasBatches := len(batches) != 0 diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index e213adcd73..30bcc7ede4 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -63,6 +63,8 @@ import ( gethrpc "github.com/ethereum/go-ethereum/rpc" ) +var _noHeadBatch = big.NewInt(0) + type enclaveImpl struct { config *config.EnclaveConfig storage storage.Storage @@ -305,20 +307,21 @@ func (e *enclaveImpl) Status() (common.Status, common.SystemError) { _, err := e.storage.FetchSecret() if err != nil { if errors.Is(err, errutil.ErrNotFound) { - return common.Status{StatusCode: common.AwaitingSecret}, nil + return common.Status{StatusCode: common.AwaitingSecret, L2Head: _noHeadBatch}, nil } return common.Status{StatusCode: common.Unavailable}, responses.ToInternalError(err) } - l1Head, err := e.storage.FetchHeadBlock() var l1HeadHash gethcommon.Hash + l1Head, err := e.storage.FetchHeadBlock() if err != nil { // this might be normal while enclave is starting up, just send empty hash e.logger.Debug("failed to fetch L1 head block for status response", log.ErrKey, err) } else { l1HeadHash = l1Head.Hash() } + // we use zero when there's no head batch yet, the first seq number is 1 + l2HeadSeqNo := _noHeadBatch l2Head, err := e.storage.FetchHeadBatch() - var l2HeadSeqNo *big.Int if err != nil { // this might be normal while enclave is starting up, just send empty hash e.logger.Debug("failed to fetch L2 head batch for status response", log.ErrKey, err) diff --git a/go/enclave/enclave_test.go b/go/enclave/enclave_test.go index c56b1427cc..c64df39df3 100644 --- a/go/enclave/enclave_test.go +++ b/go/enclave/enclave_test.go @@ -471,7 +471,7 @@ func dummyBatch(blkHash gethcommon.Hash, height uint64, state *state.StateDB) *c L1Proof: blkHash, Root: state.IntermediateRoot(true), Number: big.NewInt(int64(height)), - SequencerOrderNo: big.NewInt(int64(height)), + SequencerOrderNo: big.NewInt(int64(height + 1)), // seq number starts at 1 so need to offset ReceiptHash: types.EmptyRootHash, Time: uint64(time.Now().Unix()), } diff --git a/go/enclave/nodetype/sequencer.go b/go/enclave/nodetype/sequencer.go index 853cfbbb57..0795514ca3 100644 --- a/go/enclave/nodetype/sequencer.go +++ b/go/enclave/nodetype/sequencer.go @@ -253,7 +253,7 @@ func (s *sequencer) CreateRollup(lastBatchNo uint64) (*common.ExtRollup, error) } if err := s.signRollup(rollup); err != nil { - return nil, err + return nil, fmt.Errorf("failed to sign created rollup: %w", err) } s.logger.Info("Created new head rollup", log.RollupHashKey, rollup.Hash(), "numBatches", len(rollup.Batches)) diff --git a/go/enclave/rpc_server.go b/go/enclave/rpc_server.go index 33fe5a7f21..0606341bcc 100644 --- a/go/enclave/rpc_server.go +++ b/go/enclave/rpc_server.go @@ -233,7 +233,7 @@ func (s *RPCServer) HealthCheck(_ context.Context, _ *generated.EmptyArgs) (*gen func (s *RPCServer) CreateRollup(_ context.Context, req *generated.CreateRollupRequest) (*generated.CreateRollupResponse, error) { var fromSeqNo uint64 = 1 - if req.FromSequenceNumber != nil { + if req.FromSequenceNumber != nil && *req.FromSequenceNumber > common.L2GenesisSeqNo { fromSeqNo = *req.FromSequenceNumber } diff --git a/go/host/container/host_container.go b/go/host/container/host_container.go index aa90c70c4a..bd4d54dc06 100644 --- a/go/host/container/host_container.go +++ b/go/host/container/host_container.go @@ -127,32 +127,22 @@ func NewHostContainerFromConfig(parsedConfig *config.HostInputConfig, logger get cfg.ID = ethWallet.Address() fmt.Println("Connecting to the enclave...") + services := host.NewServicesRegistry(logger) enclaveClient := enclaverpc.NewClient(cfg, logger) p2pLogger := logger.New(log.CmpKey, log.P2PCmp) metricsService := metrics.New(cfg.MetricsEnabled, cfg.MetricsHTTPPort, logger) - aggP2P := p2p.NewSocketP2PLayer(cfg, p2pLogger, metricsService.Registry()) + aggP2P := p2p.NewSocketP2PLayer(cfg, services, p2pLogger, metricsService.Registry()) rpcServer := clientrpc.NewServer(cfg, logger) mgmtContractLib := mgmtcontractlib.NewMgmtContractLib(&cfg.ManagementContractAddress, logger) - return NewHostContainer(cfg, aggP2P, l1Client, enclaveClient, mgmtContractLib, ethWallet, rpcServer, logger, metricsService) + return NewHostContainer(cfg, services, aggP2P, l1Client, enclaveClient, mgmtContractLib, ethWallet, rpcServer, logger, metricsService) } // NewHostContainer builds a host container with dependency injection rather than from config. // Useful for testing etc. (want to be able to pass in logger, and also have option to mock out dependencies) -func NewHostContainer( - cfg *config.HostConfig, // provides various parameters that the host needs to function - // todo (@matt) sort out all this wiring, we should depend on interfaces not concrete types, we should make it obvious and consistent how services are instantiated - p2p host.P2PHostService, // provides the inbound and outbound p2p communication layer - l1Client ethadapter.EthClient, // provides inbound and outbound L1 connectivity - enclaveClient common.Enclave, // provides RPC connection to this host's Enclave - contractLib mgmtcontractlib.MgmtContractLib, // provides the management contract lib injection - hostWallet wallet.Wallet, // provides an L1 wallet for the host's transactions - rpcServer clientrpc.Server, // For communication with Obscuro client applications - logger gethlog.Logger, // provides logging with context - metricsService *metrics.Service, // provides the metrics service for other packages to use -) *HostContainer { - h := host.NewHost(cfg, p2p, l1Client, enclaveClient, hostWallet, contractLib, logger, metricsService.Registry()) +func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p2p host.P2PHostService, l1Client ethadapter.EthClient, enclaveClient common.Enclave, contractLib mgmtcontractlib.MgmtContractLib, hostWallet wallet.Wallet, rpcServer clientrpc.Server, logger gethlog.Logger, metricsService *metrics.Service) *HostContainer { + h := host.NewHost(cfg, services, p2p, l1Client, enclaveClient, hostWallet, contractLib, logger, metricsService.Registry()) hostContainer := &HostContainer{ host: h, diff --git a/go/host/db/batches.go b/go/host/db/batches.go index 02b9902c81..c70506b4ea 100644 --- a/go/host/db/batches.go +++ b/go/host/db/batches.go @@ -127,7 +127,7 @@ func (db *DB) GetBatchBySequenceNumber(sequenceNumber *big.Int) (*common.ExtBatc db.batchReads.Inc(1) batchHash, err := db.readBatchHashBySequenceNumber(sequenceNumber) if err != nil { - return nil, errors.Wrapf(err, "could not retrieve batch hash for seqNo=%d", sequenceNumber) + return nil, fmt.Errorf("could not retrieve batch hash for seqNo=%d: %w", sequenceNumber, err) } return db.GetBatch(*batchHash) } diff --git a/go/host/enclave/guardian.go b/go/host/enclave/guardian.go new file mode 100644 index 0000000000..7c13744c74 --- /dev/null +++ b/go/host/enclave/guardian.go @@ -0,0 +1,594 @@ +package enclave + +import ( + "fmt" + "math/big" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/obscuronet/go-obscuro/go/common/gethutil" + + "github.com/kamilsk/breaker" + + "github.com/ethereum/go-ethereum/core/types" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/common/errutil" + "github.com/obscuronet/go-obscuro/go/common/host" + "github.com/obscuronet/go-obscuro/go/common/log" + "github.com/obscuronet/go-obscuro/go/common/retry" + "github.com/obscuronet/go-obscuro/go/config" + "github.com/obscuronet/go-obscuro/go/host/db" + "github.com/obscuronet/go-obscuro/go/host/l1" + "github.com/pkg/errors" +) + +const ( + // time between loops on mainLoop, will be retry time if things are failing + _retryInterval = 100 * time.Millisecond + + // when enclave is healthy this is the time before we call its status (can be slow, is just a sanity check) + _monitoringInterval = 1 * time.Second + + // when we have submitted request to L1 for the secret, how long do we wait for an answer before we retry + _maxWaitForSecretResponse = 2 * time.Minute +) + +// This private interface enforces the services that the guardian depends on +type guardianServiceLocator interface { + P2P() host.P2P + L1Publisher() host.L1Publisher + L1Repo() host.L1BlockRepository + L2Repo() host.L2BatchRepository + LogSubs() host.LogSubscriptionManager +} + +// Guardian is a host service which monitors an enclave, it's responsibilities include: +// - monitor the enclave state and feed it the data it needs +// - if it is an active sequencer then the guardian will trigger batch/rollup creation +// - guardian provides access to the enclave data and reports the enclave status for other services - acting as a gatekeeper +type Guardian struct { + hostData host.Identity + state *StateTracker // state machine that tracks our view of the enclave's state + enclaveClient common.Enclave + + sl guardianServiceLocator + db *db.DB + + submitDataLock sync.Mutex // we only submit one block, batch or transaction to enclave at a time + + batchInterval time.Duration + rollupInterval time.Duration + + running atomic.Bool + hostInterrupter breaker.Interface // host hostInterrupter so we can stop quickly + + logger gethlog.Logger +} + +func NewGuardian(cfg *config.HostConfig, hostData host.Identity, serviceLocator guardianServiceLocator, enclaveClient common.Enclave, db *db.DB, interrupter breaker.Interface, logger gethlog.Logger) *Guardian { + return &Guardian{ + hostData: hostData, + state: NewStateTracker(logger), + enclaveClient: enclaveClient, + sl: serviceLocator, + batchInterval: cfg.BatchInterval, + rollupInterval: cfg.RollupInterval, + db: db, + hostInterrupter: interrupter, + logger: logger, + } +} + +func (g *Guardian) Start() error { + g.running.Store(true) + go g.mainLoop() + if g.hostData.IsSequencer { + // if we are a sequencer then we need to start the periodic batch/rollup production + // Note: after HA work this will need additional check that we are the **active** sequencer enclave + go g.periodicBatchProduction() + go g.periodicRollupProduction() + } + + // subscribe for L1 and P2P data + g.sl.P2P().SubscribeForTx(g) + g.sl.L1Repo().Subscribe(g) + g.sl.L2Repo().Subscribe(g) + + // start streaming data from the enclave + go g.streamEnclaveData() + + return nil +} + +func (g *Guardian) Stop() error { + g.running.Store(false) + + err := g.enclaveClient.Stop() + if err != nil { + g.logger.Warn("error stopping enclave", log.ErrKey, err) + } + + err = g.enclaveClient.StopClient() + if err != nil { + g.logger.Warn("error stopping enclave client", log.ErrKey, err) + } + + return nil +} + +func (g *Guardian) HealthStatus() host.HealthStatus { + // todo (@matt) do proper health status based on enclave state + errMsg := "" + if !g.running.Load() { + errMsg = "not running" + } + return &host.BasicErrHealthStatus{ErrMsg: errMsg} +} + +func (g *Guardian) GetEnclaveState() *StateTracker { + return g.state +} + +// GetEnclaveClient returns the enclave client for use by other services +// todo (@matt) avoid exposing client directly and return errors if enclave is not ready for requests +func (g *Guardian) GetEnclaveClient() common.Enclave { + return g.enclaveClient +} + +// HandleBlock is called by the L1 repository when new blocks arrive. +// Note: The L1 processing behaviour has two modes based on the state, either +// - enclave is behind: lookup blocks to feed it 1-by-1 (see `catchupWithL1()`), ignore new live blocks that arrive here +// - enclave is up-to-date: feed it these live blocks as they arrive, no need to lookup blocks +func (g *Guardian) HandleBlock(block *types.Block) { + g.logger.Debug("Received L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number()) + // record the newest block we've seen + g.state.OnReceivedBlock(block.Hash()) + if !g.state.InSyncWithL1() { + // the enclave is still catching up with the L1 chain, it won't be able to process this new head block yet so return + return + } + err := g.submitL1Block(block, true) + if err != nil { + g.logger.Warn("failure processing L1 block", log.ErrKey, err) + } +} + +// HandleBatch is called by the L2 repository when a new batch arrives +// Note: this should only be called for validators, sequencers produce their own batches +func (g *Guardian) HandleBatch(batch *common.ExtBatch) { + if g.hostData.IsSequencer { + g.logger.Error("repo received batch but we are a sequencer, ignoring") + return + } + g.logger.Debug("Received L2 block", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.Header.SequencerOrderNo) + // record the newest batch we've seen + g.state.OnReceivedBatch(batch.Header.SequencerOrderNo) + if !g.state.IsUpToDate() { + return // ignore batches until we're up-to-date + } + err := g.submitL2Batch(batch) + if err != nil { + g.logger.Warn("error submitting batch to enclave", log.ErrKey, err) + } +} + +func (g *Guardian) HandleTransaction(tx common.EncryptedTx) { + resp, sysError := g.enclaveClient.SubmitTx(tx) + if sysError != nil { + g.logger.Warn("could not submit transaction due to sysError", log.ErrKey, sysError) + return + } + if resp.Error() != nil { + g.logger.Trace("could not submit transaction", log.ErrKey, resp.Error()) + } +} + +// mainLoop runs until the enclave guardian is stopped. It checks the state of the enclave and takes action as +// required to improve the state (e.g. provide a secret, catch up with L1, etc.) +func (g *Guardian) mainLoop() { + g.logger.Debug("starting guardian main loop") + for g.running.Load() { + // check enclave status on every loop (this will happen whenever we hit an error while trying to resolve a state, + // or after the monitoring interval if we are healthy) + g.checkEnclaveStatus() + g.logger.Trace("mainLoop - enclave status", "status", g.state.GetStatus()) + switch g.state.GetStatus() { + case Disconnected, Unavailable: + // nothing to do, we are waiting for the enclave to be available + time.Sleep(_retryInterval) + case AwaitingSecret: + err := g.provideSecret() + if err != nil { + g.logger.Warn("could not provide secret to enclave", log.ErrKey, err) + time.Sleep(_retryInterval) + } + case L1Catchup: + // catchUpWithL1 will feed blocks 1-by-1 to the enclave until we are up-to-date, we hit an error or the guardian is stopped + err := g.catchupWithL1() + if err != nil { + g.logger.Warn("could not catch up with L1", log.ErrKey, err) + time.Sleep(_retryInterval) + } + case L2Catchup: + // catchUpWithL2 will feed batches 1-by-1 to the enclave until we are up-to-date, we hit an error or the guardian is stopped + err := g.catchupWithL2() + if err != nil { + g.logger.Warn("could not catch up with L2", log.ErrKey, err) + time.Sleep(_retryInterval) + } + case Live: + // we're healthy: loop back to enclave status again after long monitoring interval + select { + case <-time.After(_monitoringInterval): + // loop back to check status + case <-g.hostInterrupter.Done(): + // stop sleeping, we've been interrupted by the host stopping + } + } + } + g.logger.Debug("stopping guardian main loop") +} + +func (g *Guardian) checkEnclaveStatus() { + s, err := g.enclaveClient.Status() + if err != nil { + g.logger.Error("could not get enclave status", log.ErrKey, err) + // we record this as a disconnection, we can't get any more info from the enclave about status currently + g.state.OnDisconnected() + return + } + g.state.OnEnclaveStatus(s) +} + +// This method implements the procedure by which a node obtains the secret +func (g *Guardian) provideSecret() error { + if g.hostData.IsGenesis { + // instead of requesting a secret, we generate one and broadcast it + return g.generateAndBroadcastSecret() + } + g.logger.Info("Requesting secret.") + att, err := g.enclaveClient.Attestation() + if err != nil { + return fmt.Errorf("could not retrieve attestation from enclave. Cause: %w", err) + } + if att.Owner != g.hostData.ID { + return fmt.Errorf("host has ID %s, but its enclave produced an attestation using ID %s", g.hostData.ID.Hex(), att.Owner.Hex()) + } + + g.logger.Info("Requesting secret.") + // returns the L1 block when the request was published, any response will be after that block + awaitFromBlock, err := g.sl.L1Publisher().RequestSecret(att) + if err != nil { + return errors.Wrap(err, "could not request secret from L1") + } + + // keep checking L1 blocks until we find a secret response for our request or timeout + err = retry.Do(func() error { + nextBlock, _, err := g.sl.L1Repo().FetchNextBlock(awaitFromBlock) + if err != nil { + return fmt.Errorf("next block after block=%s not found - %w", awaitFromBlock, err) + } + secretRespTxs := g.sl.L1Publisher().ExtractSecretResponses(nextBlock) + if err != nil { + return fmt.Errorf("could not extract secret responses from block=%s - %w", nextBlock.Hash(), err) + } + for _, scrt := range secretRespTxs { + if scrt.RequesterID.Hex() == g.hostData.ID.Hex() { + err = g.enclaveClient.InitEnclave(scrt.Secret) + if err != nil { + g.logger.Warn("could not initialize enclave with received secret response", log.ErrKey, err) + continue // try the next secret response in the block if there are more + } + return nil // successfully initialized enclave with secret, break out of retry loop function + } + } + awaitFromBlock = nextBlock.Hash() + return errors.New("no valid secret received in block") + }, retry.NewTimeoutStrategy(_maxWaitForSecretResponse, 500*time.Millisecond)) + if err != nil { + // something went wrong, check the enclave status in case it is an enclave problem and let the main loop try again when appropriate + return errors.Wrap(err, "no valid secret received for enclave") + } + + g.logger.Info("Secret received") + g.state.OnSecretProvided() + + // we're now ready to catch up with network, sync peer list + go g.sl.P2P().RefreshPeerList() + return nil +} + +func (g *Guardian) generateAndBroadcastSecret() error { + g.logger.Info("Node is genesis node. Broadcasting secret.") + // Create the shared secret and submit it to the management contract for storage + attestation, err := g.enclaveClient.Attestation() + if err != nil { + return fmt.Errorf("could not retrieve attestation from enclave. Cause: %w", err) + } + if attestation.Owner != g.hostData.ID { + return fmt.Errorf("genesis node has ID %s, but its enclave produced an attestation using ID %s", g.hostData.ID.Hex(), attestation.Owner.Hex()) + } + + secret, err := g.enclaveClient.GenerateSecret() + if err != nil { + return fmt.Errorf("could not generate secret. Cause: %w", err) + } + + err = g.sl.L1Publisher().InitializeSecret(attestation, secret) + if err != nil { + return errors.Wrap(err, "failed to initialise enclave secret") + } + g.logger.Info("Node is genesis node. Secret was broadcast.") + g.state.OnSecretProvided() + return nil +} + +func (g *Guardian) catchupWithL1() error { + // while we are behind the L1 head and still running, fetch and submit L1 blocks + for g.running.Load() && g.state.GetStatus() == L1Catchup { + l1Block, isLatest, err := g.sl.L1Repo().FetchNextBlock(g.state.GetEnclaveL1Head()) + if err != nil { + if errors.Is(err, l1.ErrNoNextBlock) { + if g.state.hostL1Head == gethutil.EmptyHash { + return fmt.Errorf("no L1 blocks found in repository") + } + return nil // we are up-to-date + } + return errors.Wrap(err, "could not fetch next L1 block") + } + err = g.submitL1Block(l1Block, isLatest) + if err != nil { + return err + } + } + return nil +} + +func (g *Guardian) catchupWithL2() error { + // while we are behind the L2 head and still running: + for g.running.Load() && g.state.GetStatus() == L2Catchup { + // request the next batch by sequence number (based on what the enclave has been fed so far) + prevHead := g.state.GetEnclaveL2Head() + nextHead := prevHead.Add(prevHead, big.NewInt(1)) + + g.logger.Trace("fetching next batch", log.BatchSeqNoKey, nextHead) + batch, err := g.sl.L2Repo().FetchBatchBySeqNo(nextHead) + if err != nil { + return errors.Wrap(err, "could not fetch next L2 batch") + } + + err = g.submitL2Batch(batch) + if err != nil { + return err + } + } + return nil +} + +func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) error { + g.logger.Trace("submitting L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number()) + receipts := g.sl.L1Repo().FetchReceipts(block) + if !g.submitDataLock.TryLock() { + // we are already submitting a block, and we don't want to leak goroutines, we wil catch up with the block later + return errors.New("unable to submit block, already submitting another block") + } + resp, err := g.enclaveClient.SubmitL1Block(*block, receipts, isLatest) + g.submitDataLock.Unlock() + if err != nil { + if strings.Contains(err.Error(), errutil.ErrBlockAlreadyProcessed.Error()) { + // we have already processed this block, let's try the next canonical block + // this is most common when we are returning to a previous fork and the enclave has already seen some of the blocks on it + // note: logging this because we don't expect it to happen often and would like visibility on that. + g.logger.Info("L1 block already processed by enclave, trying the next block", "block", block.Hash()) + nextHeight := big.NewInt(0).Add(block.Number(), big.NewInt(1)) + nextCanonicalBlock, err := g.sl.L1Repo().FetchBlockByHeight(nextHeight) + if err != nil { + return fmt.Errorf("failed to fetch next block after forking block=%s: %w", block.Hash(), err) + } + return g.submitL1Block(nextCanonicalBlock, isLatest) + } + // something went wrong, return error and let the main loop check status and try again when appropriate + return errors.Wrap(err, "could not submit L1 block to enclave") + } + // successfully processed block, update the state + g.state.OnProcessedBlock(block.Hash()) + g.processL1BlockTransactions(block) + + // todo (@matt) this should not be here, it is only used by the RPC API server for batch data which will eventually just use L1 repo + err = g.db.AddBlockHeader(block.Header()) + if err != nil { + return fmt.Errorf("submitted block to enclave but could not store the block processing result. Cause: %w", err) + } + + // todo: make sure this doesn't respond to old requests (once we have a proper protocol for that) + err = g.publishSharedSecretResponses(resp.ProducedSecretResponses) + if err != nil { + g.logger.Error("failed to publish response to secret request", log.ErrKey, err) + } + return nil +} + +func (g *Guardian) processL1BlockTransactions(block *common.L1Block) { + // if there are any secret responses in the block we should refresh our P2P list to re-sync with the network + respTxs := g.sl.L1Publisher().ExtractSecretResponses(block) + if len(respTxs) > 0 { + // new peers may have been granted access to the network, notify p2p service to refresh its peer list + go g.sl.P2P().RefreshPeerList() + } + + rollupTxs := g.sl.L1Publisher().ExtractRollupTxs(block) + for _, rollup := range rollupTxs { + r, err := common.DecodeRollup(rollup.Rollup) + if err != nil { + g.logger.Error("could not decode rollup.", log.ErrKey, err) + } + err = g.db.AddRollupHeader(r) + if err != nil { + g.logger.Error("could not store rollup.", log.ErrKey, err) + } + } +} + +func (g *Guardian) publishSharedSecretResponses(scrtResponses []*common.ProducedSecretResponse) error { + for _, scrtResponse := range scrtResponses { + // todo (#1624) - implement proper protocol so only one host responds to this secret requests initially + // for now we just have the genesis host respond until protocol implemented + if !g.hostData.IsGenesis { + g.logger.Trace("Not genesis node, not publishing response to secret request.", + "requester", scrtResponse.RequesterID) + return nil + } + + err := g.sl.L1Publisher().PublishSecretResponse(scrtResponse) + if err != nil { + return errors.Wrap(err, "could not publish secret response") + } + } + return nil +} + +func (g *Guardian) submitL2Batch(batch *common.ExtBatch) error { + g.submitDataLock.Lock() + err := g.enclaveClient.SubmitBatch(batch) + g.submitDataLock.Unlock() + if err != nil { + // something went wrong, return error and let the main loop check status and try again when appropriate + return errors.Wrap(err, "could not submit L2 batch to enclave") + } + // successfully processed batch, update the state + g.state.OnProcessedBatch(batch.Header.SequencerOrderNo) + return nil +} + +func (g *Guardian) periodicBatchProduction() { + defer g.logger.Info("Stopping batch production") + + interval := g.batchInterval + if interval == 0 { + interval = 1 * time.Second + } + batchProdTicker := time.NewTicker(interval) + // attempt to produce rollup every time the timer ticks until we are stopped/interrupted + for { + if !g.running.Load() { + batchProdTicker.Stop() + return // stop periodic rollup production + } + select { + case <-batchProdTicker.C: + if !g.state.InSyncWithL1() { + // if we're behind the L1, we don't want to produce batches + g.logger.Debug("skipping batch production because L1 is not up to date") + continue + } + g.logger.Debug("create batch") + err := g.enclaveClient.CreateBatch() + if err != nil { + g.logger.Error("unable to produce batch", log.ErrKey, err) + } + case <-g.hostInterrupter.Done(): + // interrupted - end periodic process + batchProdTicker.Stop() + return + } + } +} + +func (g *Guardian) periodicRollupProduction() { + defer g.logger.Info("Stopping rollup production") + + interval := g.rollupInterval + if interval == 0 { + interval = 3 * time.Second + } + rollupTicker := time.NewTicker(interval) + // attempt to produce rollup every time the timer ticks until we are stopped/interrupted + for { + if !g.running.Load() { + rollupTicker.Stop() + return // stop periodic rollup production + } + select { + case <-rollupTicker.C: + if !g.state.IsUpToDate() { + // if we're behind the L1, we don't want to produce rollups + g.logger.Debug("skipping rollup production because L1 is not up to date", "state", g.state) + continue + } + lastBatchNo, err := g.sl.L1Publisher().FetchLatestSeqNo() + if err != nil { + g.logger.Warn("encountered error while trying to retrieve latest sequence number", log.ErrKey, err) + continue + } + producedRollup, err := g.enclaveClient.CreateRollup(lastBatchNo.Uint64()) + if err != nil { + g.logger.Error("unable to produce rollup", log.ErrKey, err) + } else { + g.sl.L1Publisher().PublishRollup(producedRollup) + } + case <-g.hostInterrupter.Done(): + // interrupted - end periodic process + rollupTicker.Stop() + return + } + } +} + +func (g *Guardian) streamEnclaveData() { + defer g.logger.Info("Stopping enclave data stream") + g.logger.Info("Starting L2 update stream from enclave") + + streamChan, stop := g.enclaveClient.StreamL2Updates() + var lastBatch *common.ExtBatch + for { + select { + case resp, ok := <-streamChan: + if !ok { + stop() + g.logger.Warn("Batch streaming failed. Reconnecting after 3 seconds") + time.Sleep(3 * time.Second) + streamChan, stop = g.enclaveClient.StreamL2Updates() + + continue + } + + if resp.Batch != nil { + lastBatch = resp.Batch + g.logger.Trace("Received batch from stream", log.BatchHashKey, lastBatch.Hash()) + err := g.sl.L2Repo().AddBatch(resp.Batch) + if err != nil && !errors.Is(err, errutil.ErrAlreadyExists) { + // todo (@matt) this is a catastrophic scenario, the host may never get that batch - handle this + g.logger.Crit("failed to add batch to L2 repo", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err) + } + + if g.hostData.IsSequencer { // if we are the sequencer we need to broadcast this new batch to the network + g.logger.Info("Batch produced", log.BatchHeightKey, resp.Batch.Header.Number, log.BatchHashKey, resp.Batch.Hash()) + + err = g.sl.P2P().BroadcastBatches([]*common.ExtBatch{resp.Batch}) + if err != nil { + g.logger.Error("failed to broadcast batch", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err) + } + } + g.logger.Info("Batch streamed", log.BatchHeightKey, resp.Batch.Header.Number, log.BatchHashKey, resp.Batch.Hash()) + g.state.OnProcessedBatch(resp.Batch.Header.SequencerOrderNo) + } + + if resp.Logs != nil { + g.sl.LogSubs().SendLogsToSubscribers(&resp.Logs) + } + + case <-time.After(1 * time.Second): + if !g.running.Load() { + // guardian service is stopped + return + } + + case <-g.hostInterrupter.Done(): + // interrupted - end periodic process + return + } + } +} diff --git a/go/host/enclave/service.go b/go/host/enclave/service.go new file mode 100644 index 0000000000..a4cdfcbb9e --- /dev/null +++ b/go/host/enclave/service.go @@ -0,0 +1,118 @@ +package enclave + +import ( + "fmt" + "math/big" + "sync/atomic" + + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/common/errutil" + "github.com/obscuronet/go-obscuro/go/common/host" + "github.com/obscuronet/go-obscuro/go/common/log" + "github.com/obscuronet/go-obscuro/go/responses" +) + +// This private interface enforces the services that the enclaves service depends on +type enclaveServiceLocator interface { + P2P() host.P2P +} + +// Service is a host service that provides access to the enclave(s) - it handles failover, load balancing, circuit breaking when a host has multiple enclaves +type Service struct { + hostData host.Identity + sl enclaveServiceLocator + // eventually this service will support multiple enclaves for HA, but currently there's only one + // The service goes via the Guardian to talk to the enclave (because guardian knows if the enclave is healthy etc.) + enclaveGuardian *Guardian + + running atomic.Bool + logger gethlog.Logger +} + +func NewService(hostData host.Identity, serviceLocator enclaveServiceLocator, enclaveGuardian *Guardian, logger gethlog.Logger) *Service { + return &Service{ + hostData: hostData, + sl: serviceLocator, + enclaveGuardian: enclaveGuardian, + logger: logger, + } +} + +func (e *Service) Start() error { + e.running.Store(true) + return e.enclaveGuardian.Start() +} + +func (e *Service) Stop() error { + e.running.Store(false) + return e.enclaveGuardian.Stop() +} + +func (e *Service) HealthStatus() host.HealthStatus { + if !e.running.Load() { + return &host.BasicErrHealthStatus{ErrMsg: "not running"} + } + + // check the enclave health, which in turn checks the DB health + enclaveHealthy, err := e.enclaveGuardian.enclaveClient.HealthCheck() + if err != nil { + return &host.BasicErrHealthStatus{ErrMsg: fmt.Sprintf("unable to HealthCheck enclave - %s", err.Error())} + } else if !enclaveHealthy { + return &host.BasicErrHealthStatus{ErrMsg: "enclave reported itself as not healthy"} + } + + if !e.enclaveGuardian.GetEnclaveState().InSyncWithL1() { + return &host.BasicErrHealthStatus{ErrMsg: "enclave not in sync with L1"} + } + + // empty error msg means healthy + return &host.BasicErrHealthStatus{ErrMsg: ""} +} + +// LookupBatchBySeqNo is used to fetch batch data from the enclave - it is only used as a fallback for the sequencer +// host if it's missing a batch (other host services should use L2Repo to fetch batch data) +func (e *Service) LookupBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) { + state := e.enclaveGuardian.GetEnclaveState() + if state.GetEnclaveL2Head().Cmp(seqNo) < 0 { + return nil, errutil.ErrNotFound + } + client := e.enclaveGuardian.GetEnclaveClient() + return client.GetBatchBySeqNo(seqNo.Uint64()) +} + +func (e *Service) GetEnclaveClient() common.Enclave { + return e.enclaveGuardian.GetEnclaveClient() +} + +func (e *Service) SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSendRawTx) (*responses.RawTx, error) { + encryptedTx := common.EncryptedTx(encryptedParams) + + enclaveResponse, sysError := e.enclaveGuardian.GetEnclaveClient().SubmitTx(encryptedTx) + if sysError != nil { + e.logger.Warn("Could not submit transaction due to sysError.", log.ErrKey, sysError) + return nil, sysError + } + if enclaveResponse.Error() != nil { + e.logger.Trace("Could not submit transaction.", log.ErrKey, enclaveResponse.Error()) + return enclaveResponse, nil //nolint: nilerr + } + + if !e.hostData.IsSequencer { + err := e.sl.P2P().SendTxToSequencer(encryptedTx) + if err != nil { + return nil, fmt.Errorf("could not broadcast transaction to sequencer. Cause: %w", err) + } + } + + return enclaveResponse, nil +} + +func (e *Service) Subscribe(id rpc.ID, encryptedParams common.EncryptedParamsLogSubscription) error { + return e.enclaveGuardian.GetEnclaveClient().Subscribe(id, encryptedParams) +} + +func (e *Service) Unsubscribe(id rpc.ID) error { + return e.enclaveGuardian.GetEnclaveClient().Unsubscribe(id) +} diff --git a/go/host/enclave/state.go b/go/host/enclave/state.go index 29a86cb9c6..9ffd80c195 100644 --- a/go/host/enclave/state.go +++ b/go/host/enclave/state.go @@ -40,18 +40,21 @@ const ( L2Catchup ) +// when the L2 head is 0 then it means no batch has been seen or processed (first seq number is always 1) +var _noBatch = big.NewInt(0) + func (es Status) String() string { return [...]string{"Live", "Disconnected", "Unavailable", "AwaitingSecret", "L1Catchup", "L2Catchup"}[es] } // StateTracker is the state machine for the enclave type StateTracker struct { - // status is the cached status of the enclave + // status is the status according to this enclave tracker // It is a function of the properties below and recalculated when any of them change status Status // enclave states (updated when enclave returns Status and optimistically after successful actions) - enclaveStatusCode common.StatusCode + enclaveStatusCode common.StatusCode // this is the status code reported by the enclave (Running/AwaitingSecret/Unavailable) enclaveL1Head gethcommon.Hash enclaveL2Head *big.Int @@ -96,9 +99,10 @@ func (s *StateTracker) OnProcessedBatch(enclL2HeadSeqNo *big.Int) { defer s.m.Unlock() if s.hostL2Head == nil || s.hostL2Head.Cmp(enclL2HeadSeqNo) < 0 { // we've successfully processed this batch, so the host's head should be at least as high as the enclave's (this shouldn't happen, we want it to be visible if it happens) - s.logger.Warn("unexpected host head behind enclave head - updating to match", "hostHead", s.hostL2Head, "enclaveHead", enclL2HeadSeqNo) + s.logger.Trace("host head behind enclave head - updating to match", "hostHead", s.hostL2Head, "enclaveHead", enclL2HeadSeqNo) s.hostL2Head = enclL2HeadSeqNo } + s.enclaveL2Head = enclL2HeadSeqNo s.setStatus(s.calculateStatus()) } @@ -146,7 +150,7 @@ func (s *StateTracker) calculateStatus() Status { if s.hostL1Head != s.enclaveL1Head || s.enclaveL1Head == gethutil.EmptyHash { return L1Catchup } - if s.hostL2Head == nil || s.enclaveL2Head == nil || s.hostL2Head.Cmp(s.enclaveL2Head) != 0 { + if s.hostL2Head == nil || s.enclaveL2Head == nil || s.enclaveL2Head.Cmp(_noBatch) == 0 || s.hostL2Head.Cmp(s.enclaveL2Head) > 0 { return L2Catchup } return Live @@ -188,6 +192,6 @@ func (s *StateTracker) setStatus(newStatus Status) { if s.status == newStatus { return } - s.logger.Info(fmt.Sprintf("Updating enclave status from [%s] to [%s]", s.status, newStatus)) + s.logger.Info(fmt.Sprintf("Updating enclave status from [%s] to [%s]", s.status, newStatus), "state", s) s.status = newStatus } diff --git a/go/host/events/logs.go b/go/host/events/logs.go index d52fb95f5e..34307decd2 100644 --- a/go/host/events/logs.go +++ b/go/host/events/logs.go @@ -3,36 +3,67 @@ package events import ( "sync" + "github.com/obscuronet/go-obscuro/go/common/host" + "github.com/obscuronet/go-obscuro/go/common/log" + "github.com/pkg/errors" + gethlog "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/obscuronet/go-obscuro/go/common" ) +type logSubsServiceLocator interface { + Enclaves() host.EnclaveService +} + // LogEventManager manages the routing of logs back to their subscribers. +// todo (@matt) currently, this operates as a service but maybe it would make more sense to be owned by enclave service? type LogEventManager struct { + sl logSubsServiceLocator subscriptions map[rpc.ID]*subscription // The channels that logs are sent to, one per subscription subscriptionMutex *sync.RWMutex logger gethlog.Logger } -func NewLogEventManager(logger gethlog.Logger) *LogEventManager { +func NewLogEventManager(serviceLocator logSubsServiceLocator, logger gethlog.Logger) *LogEventManager { return &LogEventManager{ + sl: serviceLocator, subscriptions: map[rpc.ID]*subscription{}, subscriptionMutex: &sync.RWMutex{}, logger: logger, } } -// AddSubscription adds a subscription to the set of managed subscriptions. -func (l *LogEventManager) AddSubscription(id rpc.ID, matchedLogsCh chan []byte) { +func (l *LogEventManager) Start() error { + return nil +} + +func (l *LogEventManager) Stop() error { + return nil +} + +func (l *LogEventManager) HealthStatus() host.HealthStatus { + // always healthy for now + return &host.BasicErrHealthStatus{ErrMsg: ""} +} + +func (l *LogEventManager) Subscribe(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error { + err := l.sl.Enclaves().Subscribe(id, encryptedLogSubscription) + if err != nil { + return errors.Wrap(err, "could not create subscription with enclave") + } l.subscriptionMutex.Lock() defer l.subscriptionMutex.Unlock() l.subscriptions[id] = &subscription{ch: matchedLogsCh} + return nil } -// RemoveSubscription removes a subscription from the set of managed subscriptions. -func (l *LogEventManager) RemoveSubscription(id rpc.ID) { +func (l *LogEventManager) Unsubscribe(id rpc.ID) { + err := l.sl.Enclaves().Unsubscribe(id) + if err != nil { + l.logger.Warn("could not terminate enclave subscription", log.ErrKey, err) + } l.subscriptionMutex.Lock() defer l.subscriptionMutex.Unlock() diff --git a/go/host/host.go b/go/host/host.go index 2152f4c57c..f4ebbed6c8 100644 --- a/go/host/host.go +++ b/go/host/host.go @@ -5,28 +5,19 @@ import ( "fmt" "math/big" "os" - "strings" - "sync" - "sync/atomic" - "time" - "github.com/obscuronet/go-obscuro/go/host/l2" + "github.com/kamilsk/breaker" - "github.com/obscuronet/go-obscuro/go/host/l1" - "github.com/pkg/errors" + "github.com/obscuronet/go-obscuro/go/host/l2" "github.com/obscuronet/go-obscuro/go/host/enclave" + "github.com/obscuronet/go-obscuro/go/host/l1" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" - "github.com/kamilsk/breaker" "github.com/naoina/toml" "github.com/obscuronet/go-obscuro/go/common" - "github.com/obscuronet/go-obscuro/go/common/errutil" "github.com/obscuronet/go-obscuro/go/common/log" - "github.com/obscuronet/go-obscuro/go/common/measure" "github.com/obscuronet/go-obscuro/go/common/profiler" - "github.com/obscuronet/go-obscuro/go/common/retry" "github.com/obscuronet/go-obscuro/go/common/stopcontrol" "github.com/obscuronet/go-obscuro/go/config" "github.com/obscuronet/go-obscuro/go/ethadapter" @@ -42,10 +33,6 @@ import ( hostcommon "github.com/obscuronet/go-obscuro/go/common/host" ) -const ( - maxWaitForSecretResponse = 120 * time.Second -) - type P2PHostService interface { hostcommon.Service hostcommon.P2P @@ -55,42 +42,20 @@ type P2PHostService interface { type host struct { config *config.HostConfig shortID uint64 - services map[string]hostcommon.Service // host services - registered by name for health checks, start and stop - - enclaveClient common.Enclave // For communication with the enclave - enclaveState *enclave.StateTracker // StateTracker machine that maintains the current state of enclave (stepping stone to enclave-guardian) - statusReqInFlight atomic.Bool // Flag to ensure that only one enclave status request is sent at a time (the requests are triggered in separate threads when unexpected enclave errors occur) - - // control the host lifecycle - interrupter breaker.Interface - shutdownGroup sync.WaitGroup + services *ServicesRegistry // registry of services that the host manages and makes available // ignore incoming requests stopControl *stopcontrol.StopControl - txP2PCh chan common.EncryptedTx // The channel that new transactions from peers are sent to - - submitDataToEnclaveLock sync.Mutex // host should only submit one block or batch to enclave at a time (probably not needed in enclave-guardian but here for safety for now) - db *db.DB // Stores the host's publicly-available data - logEventManager *events.LogEventManager - logger gethlog.Logger metricRegistry gethmetrics.Registry + interrupter breaker.Interface } -func NewHost( - config *config.HostConfig, - p2p P2PHostService, - ethClient ethadapter.EthClient, - enclaveClient common.Enclave, - ethWallet wallet.Wallet, - mgmtContractLib mgmtcontractlib.MgmtContractLib, - logger gethlog.Logger, - regMetrics gethmetrics.Registry, -) hostcommon.Host { +func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p P2PHostService, ethClient ethadapter.EthClient, enclaveClient common.Enclave, ethWallet wallet.Wallet, mgmtContractLib mgmtcontractlib.MgmtContractLib, logger gethlog.Logger, regMetrics gethmetrics.Registry) hostcommon.Host { database, err := db.CreateDBFromConfig(config, regMetrics, logger) if err != nil { logger.Crit("unable to create database for host", log.ErrKey, err) @@ -106,41 +71,40 @@ func NewHost( } enclStateTracker := enclave.NewStateTracker(logger) enclStateTracker.OnProcessedBlock(l1StartHash) // this makes sure we start streaming from the right block, will be less clunky in the enclave guardian - hostIdentity := hostcommon.NewIdentity(config.ID, config.P2PPublicAddress) + hostIdentity := hostcommon.NewIdentity(config) host := &host{ // config config: config, shortID: common.ShortAddress(config.ID), // services - services: make(map[string]hostcommon.Service), - - // Communication layers. - enclaveClient: enclaveClient, - enclaveState: enclStateTracker, - - // incoming data - txP2PCh: make(chan common.EncryptedTx), - - submitDataToEnclaveLock: sync.Mutex{}, + services: hostServices, // Initialize the host DB db: database, - logEventManager: events.NewLogEventManager(logger), - logger: logger, metricRegistry: regMetrics, stopControl: stopcontrol.New(), } - enclService := host // temporarily using the host as the enclService until we have the enclave-guardian wired in - l2Repo := l2.NewBatchRepository(config, p2p, enclService, database, host.logger) - - host.RegisterService(hostcommon.P2PName, p2p) - host.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo) - host.RegisterService(hostcommon.L1PublisherName, l1.NewL1Publisher(hostIdentity, ethWallet, ethClient, mgmtContractLib, l1Repo, logger)) - host.RegisterService(hostcommon.L2BatchRepositoryName, l2Repo) + host.interrupter = breaker.Multiplex( + breaker.BreakBySignal( + os.Kill, + os.Interrupt, + ), + ) + enclGuardian := enclave.NewGuardian(config, hostIdentity, hostServices, enclaveClient, database, host.interrupter, logger) + enclService := enclave.NewService(hostIdentity, hostServices, enclGuardian, logger) + l2Repo := l2.NewBatchRepository(config, hostServices, database, logger) + subsService := events.NewLogEventManager(hostServices, logger) + + hostServices.RegisterService(hostcommon.P2PName, p2p) + hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo) + hostServices.RegisterService(hostcommon.L1PublisherName, l1.NewL1Publisher(hostIdentity, ethWallet, ethClient, mgmtContractLib, l1Repo, logger)) + hostServices.RegisterService(hostcommon.L2BatchRepositoryName, l2Repo) + hostServices.RegisterService(hostcommon.EnclaveServiceName, enclService) + hostServices.RegisterService(hostcommon.LogSubscriptionServiceName, subsService) var prof *profiler.Profiler if config.ProfilerEnabled { @@ -157,13 +121,6 @@ func NewHost( return host } -func (h *host) RegisterService(name string, service hostcommon.Service) { - if _, ok := h.services[name]; ok { - h.logger.Crit("service already registered", "name", name) - } - h.services[name] = service -} - // Start validates the host config and starts the Host in a go routine - immediately returns after func (h *host) Start() error { if h.stopControl.IsStopping() { @@ -177,102 +134,22 @@ func (h *host) Start() error { ), ) + h.validateConfig() + // start all registered services - for name, service := range h.services { + for name, service := range h.services.All() { err := service.Start() if err != nil { - return errors.Wrapf(err, "could not start service=%s", name) + return fmt.Errorf("could not start service=%s: %w", name, err) } } - h.validateConfig() - tomlConfig, err := toml.Marshal(h.config) if err != nil { return fmt.Errorf("could not print host config - %w", err) } h.logger.Info("Host started with following config", log.CfgKey, string(tomlConfig)) - go func() { - // wait for the Enclave to be available - enclStatus := h.waitForEnclave() - - // todo (#1474) - the host should only connect to enclaves with the same ID as the host.ID - if enclStatus.StatusCode == common.AwaitingSecret { - err = h.requestSecret() - if err != nil { - h.logger.Crit("Could not request secret", log.ErrKey, err.Error()) - } - } - - err := h.refreshP2PPeerList() - if err != nil { - h.logger.Warn("unable to sync current p2p peer list on startup", log.ErrKey, err) - } - - // start the host's main processing loop - h.startProcessing() - }() - - return nil -} - -// HandleBlock is called by the L1 repository. The host is subscribed to receive new blocks. -func (h *host) HandleBlock(block *types.Block) { - h.logger.Debug("Received L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number()) - // record the newest block we've seen - h.enclaveState.OnReceivedBlock(block.Hash()) - if !h.enclaveState.InSyncWithL1() { - return // ignore blocks until we're up-to-date - } - h.submitDataToEnclaveLock.Lock() - defer h.submitDataToEnclaveLock.Unlock() - err := h.processL1Block(block, true, false) - if err != nil { - h.logger.Warn("failure processing L1 block", log.ErrKey, err) - } -} - -// HandleBatch is called by the L2 repository when a new batch arrives -func (h *host) HandleBatch(batch *common.ExtBatch) { - h.logger.Debug("Received L2 batch", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.Header.SequencerOrderNo) - // record the newest batch we've seen - h.enclaveState.OnReceivedBatch(batch.Header.SequencerOrderNo) - if !h.enclaveState.IsUpToDate() { - return // ignore batches until we're up-to-date - } - h.submitDataToEnclaveLock.Lock() - defer h.submitDataToEnclaveLock.Unlock() - err := h.enclaveClient.SubmitBatch(batch) - if err != nil { - h.logger.Warn("error submitting batch to enclave", log.ErrKey, err) - } else { - h.enclaveState.OnProcessedBatch(batch.Header.SequencerOrderNo) - } -} - -func (h *host) generateAndBroadcastSecret() error { - h.logger.Info("Node is genesis node. Broadcasting secret.") - // Create the shared secret and submit it to the management contract for storage - attestation, err := h.enclaveClient.Attestation() - if err != nil { - return fmt.Errorf("could not retrieve attestation from enclave. Cause: %w", err) - } - if attestation.Owner != h.config.ID { - return fmt.Errorf("genesis node has ID %s, but its enclave produced an attestation using ID %s", h.config.ID.Hex(), attestation.Owner.Hex()) - } - - secret, err := h.enclaveClient.GenerateSecret() - if err != nil { - return fmt.Errorf("could not generate secret. Cause: %w", err) - } - - err = h.l1Publisher().InitializeSecret(attestation, secret) - if err != nil { - return errors.Wrap(err, "failed to initialise enclave secret") - } - h.logger.Info("Node is genesis node. Secret was broadcast.") - h.enclaveState.OnSecretProvided() return nil } @@ -285,60 +162,28 @@ func (h *host) DB() *db.DB { } func (h *host) EnclaveClient() common.Enclave { - return h.enclaveClient + return h.services.Enclaves().GetEnclaveClient() } func (h *host) SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSendRawTx) (*responses.RawTx, error) { if h.stopControl.IsStopping() { return nil, responses.ToInternalError(fmt.Errorf("requested SubmitAndBroadcastTx with the host stopping")) } - encryptedTx := common.EncryptedTx(encryptedParams) - - enclaveResponse, sysError := h.enclaveClient.SubmitTx(encryptedTx) - if sysError != nil { - h.logger.Warn("Could not submit transaction due to sysError.", log.ErrKey, sysError) - return nil, sysError - } - if enclaveResponse.Error() != nil { - h.logger.Trace("Could not submit transaction.", log.ErrKey, enclaveResponse.Error()) - return enclaveResponse, nil //nolint: nilerr - } - - if h.config.NodeType != common.Sequencer { - err := h.p2p().SendTxToSequencer(encryptedTx) - if err != nil { - return nil, fmt.Errorf("could not broadcast transaction to sequencer. Cause: %w", err) - } - } - - return enclaveResponse, nil -} - -func (h *host) HandleTransaction(tx common.EncryptedTx) { - h.txP2PCh <- tx + return h.services.Enclaves().SubmitAndBroadcastTx(encryptedParams) } func (h *host) Subscribe(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error { if h.stopControl.IsStopping() { return responses.ToInternalError(fmt.Errorf("requested Subscribe with the host stopping")) } - err := h.EnclaveClient().Subscribe(id, encryptedLogSubscription) - if err != nil { - return fmt.Errorf("could not create subscription with enclave. Cause: %w", err) - } - h.logEventManager.AddSubscription(id, matchedLogsCh) - return nil + return h.services.LogSubs().Subscribe(id, encryptedLogSubscription, matchedLogsCh) } func (h *host) Unsubscribe(id rpc.ID) { if h.stopControl.IsStopping() { h.logger.Error("requested Subscribe with the host stopping") } - err := h.EnclaveClient().Unsubscribe(id) - if err != nil { - h.logger.Error("could not terminate subscription", log.SubIDKey, id, log.ErrKey, err) - } - h.logEventManager.RemoveSubscription(id) + h.services.LogSubs().Unsubscribe(id) } func (h *host) Stop() error { @@ -347,25 +192,14 @@ func (h *host) Stop() error { h.logger.Info("Host received a stop command. Attempting shutdown...") h.interrupter.Close() - h.shutdownGroup.Wait() // stop all registered services - for name, service := range h.services { + for name, service := range h.services.All() { if err := service.Stop(); err != nil { h.logger.Error("failed to stop service", "service", name, log.ErrKey, err) } } - // Leave some time for all processing to finish before exiting the main loop. - time.Sleep(time.Second) - - if err := h.enclaveClient.Stop(); err != nil { - h.logger.Error("failed to stop enclave client", log.ErrKey, err) - } - if err := h.enclaveClient.StopClient(); err != nil { - h.logger.Error("failed to stop enclave client", log.ErrKey, err) - } - if err := h.db.Stop(); err != nil { h.logger.Error("failed to stop DB", log.ErrKey, err) } @@ -380,402 +214,22 @@ func (h *host) HealthCheck() (*hostcommon.HealthCheck, error) { return nil, responses.ToInternalError(fmt.Errorf("requested HealthCheck with the host stopping")) } - overallHostHealth := true healthErrors := make([]string, 0) - // check the enclave health, which in turn checks the DB health - enclaveHealthy, err := h.enclaveClient.HealthCheck() - if err != nil { - overallHostHealth = false - healthErrors = append(healthErrors, fmt.Sprintf("unable to HealthCheck enclave - %s", err.Error())) - - // simplest iteration, log the error and just return that it's not healthy - h.logger.Warn("unable to HealthCheck enclave", log.ErrKey, err) - } else if !enclaveHealthy { - overallHostHealth = false - healthErrors = append(healthErrors, "enclave reported itself as not healthy") - } - // loop through all registered services and collect their health statuses - for name, service := range h.services { + for name, service := range h.services.All() { status := service.HealthStatus() if !status.OK() { - overallHostHealth = false healthErrors = append(healthErrors, fmt.Sprintf("[%s] not healthy - %s", name, status.Message())) } } - if !h.enclaveState.InSyncWithL1() { - overallHostHealth = false - healthErrors = append(healthErrors, "not in sync with L1") - } - return &hostcommon.HealthCheck{ - OverallHealth: overallHostHealth, + OverallHealth: len(healthErrors) == 0, Errors: healthErrors, }, nil } -// Waits for enclave to be available, printing a wait message every two seconds. -func (h *host) waitForEnclave() common.Status { - counter := 0 - var status common.Status - var err error - for status, err = h.enclaveClient.Status(); err != nil; { - if counter >= 20 { - h.logger.Info(fmt.Sprintf("Waiting for enclave on %s. Latest connection attempt failed", h.config.EnclaveRPCAddress), log.ErrKey, err) - counter = 0 - } - - time.Sleep(100 * time.Millisecond) - counter++ - } - h.logger.Info("Connected to enclave service.", "enclaveStatus", status) - return status -} - -// starts the host main processing loop -func (h *host) startProcessing() { - h.p2p().SubscribeForTx(h) - if h.config.NodeType == common.Sequencer { - go h.startBatchProduction() // periodically request a new batch from enclave - go h.startRollupProduction() // periodically request a new rollup from enclave - } - - h.l1Repo().Subscribe(h) // start receiving new L1 blocks as they arrive (they'll be ignored if we're still behind) - h.l2Repo().Subscribe(h) // start receiving new L2 batches as they arrive (they'll be ignored if we're still behind) - go h.startBatchStreaming() // streams batches and events from the enclave. - - // Main Processing Loop - - // - Process new blocks from the L1 node - // - Process new Transactions gossiped from L2 Peers - for { - // if enclave is behind the L1 head then this will process the next block it needs. Just one block and then defer - // to the select to see if there's anything waiting to process. - // todo (@matt) this looping with sleep method is temporary while we still have queues for p2p, step towards enclave-guardian PR - catchingUp := h.catchUpL1Block() - loopTime := 10 * time.Millisecond - if catchingUp { - // we're catching up on L1 blocks, so we don't want to sleep we want to keep processing - loopTime = 0 - } else { // we're caught up on L1 blocks, now make sure we're caught up on L2 batches (again, just one batch at a time) - catchingUp = h.catchUpL2Batch() - if catchingUp { - // we're catching up on L2 batches, so we don't want to sleep we want to keep processing - loopTime = 0 - } - } - // check if anything is waiting for the host to process before looping back to the catch-up - select { - case tx := <-h.txP2PCh: - resp, sysError := h.enclaveClient.SubmitTx(tx) - if sysError != nil { - h.logger.Warn("Could not submit transaction due to sysError", log.ErrKey, sysError) - continue - } - if resp.Error() != nil { - h.logger.Trace("Could not submit transaction", log.ErrKey, resp.Error()) - } - - case <-h.interrupter.Done(): - return - - case <-time.After(loopTime): - // todo (@matt) this is temporary, step towards enclave-guardian PR, ensures we look back to catching up - } - } -} - -// processL1Block processes the transactions in the given block, and submits them to the enclave for ingestion. -// the 'isFork' flag affects the retry behaviour -func (h *host) processL1Block(block *types.Block, isLatestBlock bool, isFork bool) error { - // For the genesis block the parent is nil - if block == nil { - return nil - } - - h.logger.Info("Processing L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number(), "isLatestBlock", isLatestBlock) - h.processL1BlockTransactions(block) - - // submit each block to the enclave for ingestion plus validation - blockSubmissionResponse, err := h.enclaveClient.SubmitL1Block(*block, h.l1Repo().FetchReceipts(block), isLatestBlock) - if err != nil { - // if we're processing a block on a different fork and the enclave has already seen it then we try the next one - if isFork && strings.Contains(err.Error(), errutil.ErrBlockAlreadyProcessed.Error()) { - // Note: if the block is known to be a fork from the previous then we will retry the next canonical block - // every time there is an 'already processed' failure. This is because we have rewound to replay from the - // latest canonical ancestor but if we are revisiting an old fork we might need to replay more than one block - nextHeight := big.NewInt(0).Add(block.Number(), big.NewInt(1)) - nextBlock, err := h.l1Repo().FetchBlockByHeight(nextHeight) - if err != nil { - return fmt.Errorf("failed to fetch next block after forking block %s. Cause: %w", block.Hash(), err) - } - h.logger.Debug("block already processed after forking, we will try next canonical block", - log.BlockHashKey, nextBlock.Hash(), log.BlockHeightKey, nextBlock.Number()) - return h.processL1Block(nextBlock, isLatestBlock, true) - } - go h.checkEnclaveStatus() - return fmt.Errorf("did not ingest block %s. Cause: %w", block.Hash(), err) - } - h.enclaveState.OnProcessedBlock(block.Hash()) - if blockSubmissionResponse == nil { - return fmt.Errorf("no block submission response given for a submitted l1 block") - } - - err = h.db.AddBlockHeader(block.Header()) - if err != nil { - return fmt.Errorf("submitted block to enclave but could not store the block processing result. Cause: %w", err) - } - - err = h.publishSharedSecretResponses(blockSubmissionResponse.ProducedSecretResponses) - if err != nil { - h.logger.Error("failed to publish response to secret request", log.ErrKey, err) - } - - return nil -} - -// React to Obscuro L1 transactions if needed -func (h *host) processL1BlockTransactions(b *types.Block) { - // if there are any secret responses in the block we should refresh our P2P list to re-sync with the network - respTxs := h.l1Publisher().ExtractSecretResponses(b) - if len(respTxs) > 0 { - err := h.refreshP2PPeerList() - if err != nil { - h.logger.Error("Failed to update p2p peer list", log.ErrKey, err) - } - } - - rollupTxs := h.l1Publisher().ExtractRollupTxs(b) - for _, rollup := range rollupTxs { - r, err := common.DecodeRollup(rollup.Rollup) - if err != nil { - h.logger.Error("could not decode rollup.", log.ErrKey, err) - } - err = h.DB().AddRollupHeader(r) - if err != nil { - h.logger.Error("could not store rollup.", log.ErrKey, err) - } - } -} - -func (h *host) storeBatch(producedBatch *common.ExtBatch) { - defer h.logger.Info("Batch stored", log.BatchHashKey, producedBatch.Hash(), log.DurationKey, measure.NewStopwatch()) - - h.enclaveState.OnProcessedBatch(producedBatch.Header.SequencerOrderNo) - err := h.l2Repo().AddBatch(producedBatch) - // if batch already exists, we can ignore it (this happens, for example, when enclave finds batches in rollups that host has already seen) - if err != nil && !errors.Is(err, errutil.ErrAlreadyExists) { - // unexpected error storing batch - h.logger.Error("could not store batch", log.BatchHashKey, producedBatch.Hash(), log.ErrKey, err) - } -} - -// Creates a batch based on the rollup and distributes it to all other nodes. -func (h *host) storeAndDistributeBatch(producedBatch *common.ExtBatch) { - defer h.logger.Info("Batch stored and distributed", log.BatchHashKey, producedBatch.Hash(), log.DurationKey, measure.NewStopwatch()) - - h.storeBatch(producedBatch) - - err := h.p2p().BroadcastBatches([]*common.ExtBatch{producedBatch}) - if err != nil { - h.logger.Error("could not broadcast batch", log.BatchHashKey, producedBatch.Hash(), log.ErrKey, err) - } -} - -// This method implements the procedure by which a node obtains the secret -func (h *host) requestSecret() error { - if h.config.IsGenesis { - return h.generateAndBroadcastSecret() - } - h.logger.Info("Requesting secret.") - att, err := h.enclaveClient.Attestation() - if err != nil { - return fmt.Errorf("could not retrieve attestation from enclave. Cause: %w", err) - } - if att.Owner != h.config.ID { - return fmt.Errorf("host has ID %s, but its enclave produced an attestation using ID %s", h.config.ID.Hex(), att.Owner.Hex()) - } - - // returns the L1 block when the request was published, any response will be after that block - awaitFromBlock, err := h.l1Publisher().RequestSecret(att) - if err != nil { - h.logger.Crit("could not receive the secret", log.ErrKey, err) - } - - // keep checking L1 blocks until we find a secret response for our request or timeout - err = retry.Do(func() error { - nextBlock, _, _, err := h.l1Repo().FetchNextBlock(awaitFromBlock) - if err != nil { - return fmt.Errorf("next block after block=%s not found - %w", awaitFromBlock, err) - } - secretRespTxs := h.l1Publisher().ExtractSecretResponses(nextBlock) - if err != nil { - return fmt.Errorf("could not extract secret responses from block=%s - %w", nextBlock.Hash(), err) - } - for _, s := range secretRespTxs { - if s.RequesterID.Hex() == h.config.ID.Hex() { - err = h.enclaveClient.InitEnclave(s.Secret) - if err != nil { - h.logger.Warn("could not initialize enclave with received secret response", "err", err) - continue // try the next secret response in the block if there are more - } - return nil // successfully initialized enclave with secret - } - } - awaitFromBlock = nextBlock.Hash() - return errors.New("no valid secret received in block") - }, retry.NewTimeoutStrategy(maxWaitForSecretResponse, 500*time.Millisecond)) - if err != nil { - // something went wrong, check the enclave status in case it is an enclave problem and let the main loop try again when appropriate - return errors.Wrap(err, "no valid secret received for enclave") - } - - h.logger.Info("Secret received") - h.enclaveState.OnSecretProvided() - return nil -} - -func (h *host) publishSharedSecretResponses(scrtResponses []*common.ProducedSecretResponse) error { - for _, scrtResponse := range scrtResponses { - // todo (#1624) - implement proper protocol so only one host responds to this secret requests initially - // for now we just have the genesis host respond until protocol implemented - if !h.config.IsGenesis { - h.logger.Trace("Not genesis node, not publishing response to secret request.", - "requester", scrtResponse.RequesterID) - return nil - } - - err := h.l1Publisher().PublishSecretResponse(scrtResponse) - if err != nil { - return errors.Wrap(err, "could not publish secret response") - } - } - return nil -} - -// Whenever we receive a new shared secret response transaction or restart the host, we update our list of P2P peers -func (h *host) refreshP2PPeerList() error { - // We make a call to the L1 node to retrieve the latest list of aggregators - peers, err := h.l1Publisher().FetchLatestPeersList() - if err != nil { - return errors.Wrap(err, "could not fetch latest peers list from L1") - } - - h.p2p().UpdatePeerList(peers) - return nil -} - -func (h *host) startBatchProduction() { - defer h.logger.Info("Stopping batch production") - - h.shutdownGroup.Add(1) - defer h.shutdownGroup.Done() - - interval := h.config.BatchInterval - if interval == 0 { - interval = 1 * time.Second - } - batchProdTicker := time.NewTicker(interval) - for { - select { - case <-batchProdTicker.C: - if !h.enclaveState.InSyncWithL1() { - // if we're behind the L1, we don't want to produce batches - h.logger.Debug("skipping batch production because L1 is not up to date") - continue - } - h.logger.Debug("create batch") - err := h.enclaveClient.CreateBatch() - if err != nil { - h.logger.Error("unable to produce batch", log.ErrKey, err) - } - case <-h.interrupter.Done(): - return - } - } -} - -func (h *host) startBatchStreaming() { - defer h.logger.Info("Stopping batch streaming") - - h.shutdownGroup.Add(1) - defer h.shutdownGroup.Done() - - h.logger.Info("Starting L2 update stream from enclave") - streamChan, stop := h.enclaveClient.StreamL2Updates() - var lastBatch *common.ExtBatch - for { - select { - case <-h.interrupter.Done(): - stop() - return - case resp, ok := <-streamChan: - if !ok { - stop() - h.logger.Warn("Batch streaming failed. Reconnecting from latest received batch after 3 seconds") - time.Sleep(3 * time.Second) - streamChan, stop = h.enclaveClient.StreamL2Updates() - - continue - } - - if resp.Batch != nil { - lastBatch = resp.Batch - h.logger.Trace("Received batch from stream", log.BatchHashKey, lastBatch.Hash()) - if h.config.NodeType == common.Sequencer { - h.logger.Info("Batch produced", log.BatchHeightKey, resp.Batch.Header.Number, log.BatchHashKey, resp.Batch.Hash(), log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo) - h.enclaveState.OnReceivedBatch(resp.Batch.Header.SequencerOrderNo) - h.storeAndDistributeBatch(resp.Batch) - } else { - h.logger.Info("Batch streamed", log.BatchHeightKey, resp.Batch.Header.Number, log.BatchHashKey, resp.Batch.Hash()) - h.storeBatch(resp.Batch) - } - } - - if resp.Logs != nil { - h.logEventManager.SendLogsToSubscribers(&resp.Logs) - } - } - } -} - -func (h *host) startRollupProduction() { - defer h.logger.Info("Stopping rollup production") - - h.shutdownGroup.Add(1) - defer h.shutdownGroup.Done() - - interval := h.config.RollupInterval - if interval == 0 { - interval = 5 * time.Second - } - rollupTicker := time.NewTicker(interval) - for { - select { - case <-rollupTicker.C: - if !h.enclaveState.IsUpToDate() { - // if we're behind the L1, we don't want to produce rollups - h.logger.Debug("skipping rollup production because L1 is not up to date", "enclaveState", h.enclaveState) - continue - } - lastBatchNo, err := h.l1Publisher().FetchLatestSeqNo() - if err != nil { - h.logger.Warn("encountered error while trying to retrieve latest sequence number", log.ErrKey, err) - continue - } - producedRollup, err := h.enclaveClient.CreateRollup(lastBatchNo.Uint64()) - if err != nil { - h.logger.Error("unable to produce rollup", log.ErrKey, err) - } else { - h.l1Publisher().PublishRollup(producedRollup) - } - case <-h.interrupter.Done(): - return - } - } -} - // Checks the host config is valid. func (h *host) validateConfig() { if h.config.IsGenesis && h.config.NodeType != common.Sequencer { @@ -789,105 +243,3 @@ func (h *host) validateConfig() { h.logger.Crit("the host must specify a public P2P address") } } - -// this function should be fired off in a new goroutine whenever the status of the enclave needs to be verified -// (e.g. if we've seen unexpected errors from the enclave client) -func (h *host) checkEnclaveStatus() { - // only allow one status request at a time, if flag is false, atomically swap it to true and continue - if h.statusReqInFlight.CompareAndSwap(false, true) { - defer h.statusReqInFlight.Store(false) // clear flag after request completed - s, err := h.enclaveClient.Status() - if err != nil { - h.logger.Error("could not get enclave status", log.ErrKey, err) - // we record this as a disconnection, we can't get any more info from the enclave about status currently - h.enclaveState.OnDisconnected() - return - } - h.enclaveState.OnEnclaveStatus(s) - } -} - -// returns true if processed a block -func (h *host) catchUpL1Block() bool { - // nothing to do if host is stopping or L1 is up-to-date - if h.stopControl.IsStopping() || h.enclaveState.InSyncWithL1() { - return false - } - prevHead := h.enclaveState.GetEnclaveL1Head() - h.logger.Trace("fetching next block", log.BlockHashKey, prevHead) - block, isLatest, isFork, err := h.l1Repo().FetchNextBlock(prevHead) - if err != nil { - // ErrNoNext block occurs sometimes if we caught up with the L1 head, but other errors are unexpected - if !errors.Is(err, l1.ErrNoNextBlock) { - h.logger.Warn("unexpected error fetching next L1 block", log.ErrKey, err) - } - return false // nothing to do if we can't fetch the next block - } - h.submitDataToEnclaveLock.Lock() - defer h.submitDataToEnclaveLock.Unlock() - err = h.processL1Block(block, isLatest, isFork) - if err != nil { - h.logger.Warn("unable to process L1 block", log.ErrKey, err) - } - return true -} - -// returns true if processed a batch to indicate to the host that we're still catching up (so, false => up-to-date) -func (h *host) catchUpL2Batch() bool { - if h.stopControl.IsStopping() || h.enclaveState.IsUpToDate() { - return false - } - prevHead := h.enclaveState.GetEnclaveL2Head() - nextHead := big.NewInt(0) - if prevHead != nil { - nextHead = prevHead.Add(prevHead, big.NewInt(1)) - } - h.logger.Trace("fetching next batch", log.BatchSeqNoKey, nextHead) - batch, err := h.l2Repo().FetchBatchBySeqNo(nextHead) - if err != nil { - if !errors.Is(err, errutil.ErrNotFound) { - h.logger.Warn("unexpected error fetching next L2 batch", log.ErrKey, err) - } - return false // nothing to do if we can't fetch the next batch, the repo will request it p2p if it's missing, so it should become available soon - } - h.submitDataToEnclaveLock.Lock() - defer h.submitDataToEnclaveLock.Unlock() - if err = h.enclaveClient.SubmitBatch(batch); err != nil { - h.logger.Warn("unable to submit batch to enclave", log.ErrKey, err) - } else { - // successfully submitted, update the enclave state - h.enclaveState.OnProcessedBatch(batch.Header.SequencerOrderNo) - } - return true -} - -func (h *host) LookupBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) { - if h.enclaveState.GetEnclaveL2Head().Cmp(seqNo) < 0 { - return nil, errors.Wrap(errutil.ErrNotFound, "enclave has not received requested batch yet") - } - return h.enclaveClient.GetBatchBySeqNo(seqNo.Uint64()) -} - -func (h *host) getService(name string) hostcommon.Service { - service, ok := h.services[name] - if !ok { - h.logger.Crit("requested service not registered", "name", name) - } - return service -} - -func (h *host) p2p() hostcommon.P2P { - return h.getService(hostcommon.P2PName).(hostcommon.P2P) -} - -func (h *host) l1Repo() hostcommon.L1BlockRepository { - return h.getService(hostcommon.L1BlockRepositoryName).(hostcommon.L1BlockRepository) -} - -func (h *host) l1Publisher() hostcommon.L1Publisher { - return h.getService(hostcommon.L1PublisherName).(hostcommon.L1Publisher) -} - -func (h *host) l2Repo() hostcommon.L2BatchRepository { - return h.getService(hostcommon.L2BatchRepositoryName).(hostcommon.L2BatchRepository) -} diff --git a/go/host/l1/blockrepository.go b/go/host/l1/blockrepository.go index 50d8cae10c..74a6ecaf68 100644 --- a/go/host/l1/blockrepository.go +++ b/go/host/l1/blockrepository.go @@ -76,35 +76,38 @@ func (r *Repository) Subscribe(handler host.L1BlockHandler) func() { } // FetchNextBlock calculates the next canonical block that should be sent to requester after a given hash. -// It returns the block, whether it is the latest known head, and whether it has rewound to a fork -func (r *Repository) FetchNextBlock(prevBlockHash gethcommon.Hash) (*types.Block, bool, bool, error) { +// It returns the block and a bool for whether it is the latest known head +func (r *Repository) FetchNextBlock(prevBlockHash gethcommon.Hash) (*types.Block, bool, error) { if prevBlockHash == r.head { // prevBlock is the latest known head - return nil, false, false, ErrNoNextBlock + return nil, false, ErrNoNextBlock } - prevBlock, err := r.ethClient.BlockByHash(prevBlockHash) - if err != nil { - return nil, false, false, fmt.Errorf("could not find prev block with hash=%s - %w", prevBlockHash, err) + + if prevBlockHash == (gethcommon.Hash{}) { + // prevBlock is empty, so we are starting from genesis + blk, err := r.ethClient.BlockByNumber(big.NewInt(0)) + if err != nil { + return nil, false, fmt.Errorf("could not find genesis block - %w", err) + } + return blk, false, nil } + // the latestCanonAncestor will usually return the prevBlock itself but this step is necessary to walk back if there was a fork lca, err := r.latestCanonAncestor(prevBlockHash) if err != nil { - return nil, false, false, err + return nil, false, err } // and send the canonical block at the height after that // (which may be a fork, or it may just be the next on the same branch if we are catching-up) blk, err := r.ethClient.BlockByNumber(increment(lca.Number())) if err != nil { if errors.Is(err, ethereum.NotFound) { - return nil, false, false, ErrNoNextBlock + return nil, false, ErrNoNextBlock } - return nil, false, false, fmt.Errorf("could not find block after latest canon ancestor, height=%s - %w", increment(lca.Number()), err) + return nil, false, fmt.Errorf("could not find block after latest canon ancestor, height=%s - %w", increment(lca.Number()), err) } - // if the block we are about to feed is the same height or lower than the previous block, we have rewound onto a fork - isFork := prevBlock.Header().Number.Cmp(blk.Number()) >= 0 - - return blk, blk.Hash() == r.head, isFork, nil + return blk, blk.Hash() == r.head, nil } func (r *Repository) latestCanonAncestor(blkHash gethcommon.Hash) (*types.Block, error) { @@ -164,7 +167,9 @@ func (r *Repository) streamLiveBlocks() { } } - streamSub.Unsubscribe() + if streamSub != nil { + streamSub.Unsubscribe() + } } func (r *Repository) resetLiveStream() (chan *types.Header, ethereum.Subscription) { diff --git a/go/host/l1/publisher.go b/go/host/l1/publisher.go index bf7dd03b5f..3afdfb7cbc 100644 --- a/go/host/l1/publisher.go +++ b/go/host/l1/publisher.go @@ -279,7 +279,7 @@ func (p *Publisher) signAndBroadcastL1Tx(tx types.TxData, tries uint64, awaitRec return p.ethClient.SendTransaction(signedTx) }, retry.NewDoublingBackoffStrategy(time.Second, tries)) // doubling retry wait (3 tries = 7sec, 7 tries = 63sec) if err != nil { - return errors.Wrapf(err, "could not broadcast L1 tx after %d tries", tries) + return fmt.Errorf("could not broadcast L1 tx after %d tries: %w", tries, err) } p.logger.Info("Successfully submitted tx to L1", "txHash", signedTx.Hash()) @@ -308,7 +308,7 @@ func (p *Publisher) waitForReceipt(txHash common.TxHash) error { receipt, err = p.ethClient.TransactionReceipt(txHash) if err != nil { // adds more info on the error - return errors.Wrapf(err, "could not get receipt for L1 tx=%s", txHash) + return fmt.Errorf("could not get receipt for L1 tx=%s: %w", txHash, err) } return err }, diff --git a/go/host/l2/batchrepository.go b/go/host/l2/batchrepository.go index 7ab1daaf6b..02d2e94205 100644 --- a/go/host/l2/batchrepository.go +++ b/go/host/l2/batchrepository.go @@ -24,13 +24,18 @@ const ( _timeoutWaitingForP2PResponse = 30 * time.Second ) +// This private interface enforces the services that the guardian depends on +type batchRepoServiceLocator interface { + P2P() host.P2P + Enclaves() host.EnclaveService +} + // Repository is responsible for storing and retrieving batches from the database // If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them. type Repository struct { subscribers []host.L2BatchHandler - p2p host.P2P - enclave host.EnclaveService + sl batchRepoServiceLocator db *db.DB isSequencer bool @@ -49,10 +54,9 @@ type Repository struct { logger gethlog.Logger } -func NewBatchRepository(cfg *config.HostConfig, p2p host.P2P, enclave host.EnclaveService, database *db.DB, logger gethlog.Logger) *Repository { +func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, database *db.DB, logger gethlog.Logger) *Repository { return &Repository{ - p2p: p2p, - enclave: enclave, + sl: hostService, db: database, isSequencer: cfg.NodeType == common.Sequencer, latestBatchSeqNo: big.NewInt(0), @@ -65,8 +69,8 @@ func (r *Repository) Start() error { r.running.Store(true) // register ourselves for new batches from p2p - r.p2p.SubscribeForBatches(r) - r.p2p.SubscribeForBatchRequests(r) + r.sl.P2P().SubscribeForBatches(r) + r.sl.P2P().SubscribeForBatchRequests(r) return nil } @@ -136,7 +140,7 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int) return // nothing to send } - err := r.p2p.RespondToBatchRequest(requesterID, batches) + err := r.sl.P2P().RespondToBatchRequest(requesterID, batches) if err != nil { r.logger.Warn("unable to send batches to peer", "peer", requesterID, log.ErrKey, err) } @@ -170,6 +174,7 @@ func (r *Repository) FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) // - when the node is a validator to store batches read from roll-ups // If the repository already has the batch it returns an AlreadyExists error which is typically ignored. func (r *Repository) AddBatch(batch *common.ExtBatch) error { + r.logger.Info("adding batch to L2 batch repository", "seqNo", batch.Header.SequencerOrderNo) err := r.db.AddBatch(batch) if err != nil { return err @@ -184,7 +189,7 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error { } func (r *Repository) fetchBatchFallbackToEnclave(seqNo *big.Int) (*common.ExtBatch, error) { - b, err := r.enclave.LookupBatchBySeqNo(seqNo) + b, err := r.sl.Enclaves().LookupBatchBySeqNo(seqNo) if err != nil { return nil, err } @@ -212,7 +217,7 @@ func (r *Repository) requestMissingBatchesFromPeers(fromSeqNo *big.Int) { } r.logger.Debug("requesting missing batches from sequencer", "fromSeqNo", fromSeqNo) - err := r.p2p.RequestBatchesFromSequencer(fromSeqNo) + err := r.sl.P2P().RequestBatchesFromSequencer(fromSeqNo) if err != nil { r.logger.Warn("unable to request missing batches from sequencer", "fromSeqNo", fromSeqNo, log.ErrKey, err) return diff --git a/go/host/p2p/p2p.go b/go/host/p2p/p2p.go index 140d054328..1fc71acc9c 100644 --- a/go/host/p2p/p2p.go +++ b/go/host/p2p/p2p.go @@ -48,18 +48,27 @@ type message struct { Contents []byte } +type p2pServiceLocator interface { + L1Publisher() host.L1Publisher + L2Repo() host.L2BatchRepository +} + // NewSocketP2PLayer - returns the Socket implementation of the P2P -func NewSocketP2PLayer(config *config.HostConfig, logger gethlog.Logger, metricReg gethmetrics.Registry) *Service { +func NewSocketP2PLayer(config *config.HostConfig, serviceLocator p2pServiceLocator, logger gethlog.Logger, metricReg gethmetrics.Registry) *Service { return &Service{ batchSubscribers: subscription.NewManager[host.P2PBatchHandler](), txSubscribers: subscription.NewManager[host.P2PTxHandler](), batchReqHandlers: subscription.NewManager[host.P2PBatchRequestHandler](), + sl: serviceLocator, + isSequencer: config.NodeType == common.Sequencer, ourAddress: config.P2PBindAddress, peerAddresses: []string{}, p2pTimeout: config.P2PConnectionTimeout, + refreshingPeersMx: sync.Mutex{}, + // monitoring peerTracker: newPeerTracker(), metricsRegistry: metricReg, @@ -75,21 +84,24 @@ type Service struct { listener net.Listener listenerInterrupt *int32 // A value of 1 indicates that new connections should not be accepted + sl p2pServiceLocator + isSequencer bool ourAddress string peerAddresses []string p2pTimeout time.Duration - peerTracker *peerTracker - metricsRegistry gethmetrics.Registry - logger gethlog.Logger + peerTracker *peerTracker + metricsRegistry gethmetrics.Registry + logger gethlog.Logger + refreshingPeersMx sync.Mutex } func (p *Service) Start() error { // We listen for P2P connections. listener, err := net.Listen("tcp", p.ourAddress) if err != nil { - return errors.Wrapf(err, "could not listen for P2P connections on %s", p.ourAddress) + return fmt.Errorf("could not listen for P2P connections on %s: %w", p.ourAddress, err) } p.logger.Info(fmt.Sprintf("Started listening on port: %s", p.ourAddress)) @@ -134,7 +146,14 @@ func (p *Service) SubscribeForBatchRequests(handler host.P2PBatchRequestHandler) return p.batchReqHandlers.Subscribe(handler) } -func (p *Service) UpdatePeerList(newPeers []string) { +func (p *Service) RefreshPeerList() { + p.refreshingPeersMx.Lock() + defer p.refreshingPeersMx.Unlock() + newPeers, err := p.sl.L1Publisher().FetchLatestPeersList() + if err != nil { + p.logger.Error(fmt.Sprintf("unable to fetch latest peer list from L1 - %s", err.Error())) + return + } p.logger.Info(fmt.Sprintf("Updated peer list - old: %s new: %s", p.peerAddresses, newPeers)) p.peerAddresses = newPeers } diff --git a/go/host/rpc/enclaverpc/enclave_client.go b/go/host/rpc/enclaverpc/enclave_client.go index 8b550db9db..0331a304fc 100644 --- a/go/host/rpc/enclaverpc/enclave_client.go +++ b/go/host/rpc/enclaverpc/enclave_client.go @@ -192,6 +192,7 @@ func (c *Client) SubmitBatch(batch *common.ExtBatch) common.SystemError { defer cancel() batchMsg := rpc.ToExtBatchMsg(batch) + response, err := c.protoClient.SubmitBatch(timeoutCtx, &generated.SubmitBatchRequest{Batch: &batchMsg}) if err != nil { return syserr.NewRPCError(err) diff --git a/go/host/servicelocator.go b/go/host/servicelocator.go new file mode 100644 index 0000000000..3a01b23ba3 --- /dev/null +++ b/go/host/servicelocator.go @@ -0,0 +1,61 @@ +package host + +import ( + "github.com/ethereum/go-ethereum/log" + hostcommon "github.com/obscuronet/go-obscuro/go/common/host" +) + +type ServicesRegistry struct { + services map[string]hostcommon.Service + logger log.Logger +} + +func NewServicesRegistry(logger log.Logger) *ServicesRegistry { + return &ServicesRegistry{ + services: make(map[string]hostcommon.Service), + logger: logger, + } +} + +func (s *ServicesRegistry) All() map[string]hostcommon.Service { + return s.services +} + +func (s *ServicesRegistry) RegisterService(name string, service hostcommon.Service) { + if _, ok := s.services[name]; ok { + s.logger.Crit("service already registered", "name", name) + } + s.services[name] = service +} + +func (s *ServicesRegistry) getService(name string) hostcommon.Service { + service, ok := s.services[name] + if !ok { + s.logger.Crit("requested service not registered", "name", name) + } + return service +} + +func (s *ServicesRegistry) P2P() hostcommon.P2P { + return s.getService(hostcommon.P2PName).(hostcommon.P2P) +} + +func (s *ServicesRegistry) L1Repo() hostcommon.L1BlockRepository { + return s.getService(hostcommon.L1BlockRepositoryName).(hostcommon.L1BlockRepository) +} + +func (s *ServicesRegistry) L1Publisher() hostcommon.L1Publisher { + return s.getService(hostcommon.L1PublisherName).(hostcommon.L1Publisher) +} + +func (s *ServicesRegistry) L2Repo() hostcommon.L2BatchRepository { + return s.getService(hostcommon.L2BatchRepositoryName).(hostcommon.L2BatchRepository) +} + +func (s *ServicesRegistry) Enclaves() hostcommon.EnclaveService { + return s.getService(hostcommon.EnclaveServiceName).(hostcommon.EnclaveService) +} + +func (s *ServicesRegistry) LogSubs() hostcommon.LogSubscriptionManager { + return s.getService(hostcommon.LogSubscriptionServiceName).(hostcommon.LogSubscriptionManager) +} diff --git a/integration/ethereummock/node.go b/integration/ethereummock/node.go index ec1d79e727..9d1cf99379 100644 --- a/integration/ethereummock/node.go +++ b/integration/ethereummock/node.go @@ -135,7 +135,8 @@ func (m *Node) FetchLastBatchSeqNo(gethcommon.Address) (*big.Int, error) { return big.NewInt(int64(rollup.Header.LastBatchSeqNo)), nil } } - return big.NewInt(0), nil + // the first batch is number 1 + return big.NewInt(int64(common.L2GenesisSeqNo)), nil } // BlockListener provides stream of latest mock head headers as they are created diff --git a/integration/networktest/runner.go b/integration/networktest/runner.go index 818f0acb7e..f692e166a7 100644 --- a/integration/networktest/runner.go +++ b/integration/networktest/runner.go @@ -32,8 +32,8 @@ func Run(testName string, t *testing.T, env Environment, action Action) { if err != nil { t.Fatal(err) } - fmt.Println("Verifying test:", testName) time.Sleep(2 * time.Second) // allow time for latest test transactions to propagate todo (@matt) consider how to configure this sleep + fmt.Println("Verifying test:", testName) err = action.Verify(ctx, network) if err != nil { t.Fatal(err) diff --git a/integration/simulation/devnetwork/node.go b/integration/simulation/devnetwork/node.go index 88354408ef..c6bfca0005 100644 --- a/integration/simulation/devnetwork/node.go +++ b/integration/simulation/devnetwork/node.go @@ -6,6 +6,8 @@ import ( "os" "time" + "github.com/obscuronet/go-obscuro/go/host" + "github.com/obscuronet/go-obscuro/go/enclave/storage/init/sqlite" "github.com/obscuronet/go-obscuro/go/ethadapter/mgmtcontractlib" @@ -131,13 +133,14 @@ func (n *InMemNodeOperator) createHostContainer() *hostcontainer.HostContainer { // create a socket P2P layer p2pLogger := hostLogger.New(log.CmpKey, log.P2PCmp) - nodeP2p := p2p.NewSocketP2PLayer(hostConfig, p2pLogger, nil) + svcLocator := host.NewServicesRegistry(n.logger) + nodeP2p := p2p.NewSocketP2PLayer(hostConfig, svcLocator, p2pLogger, nil) // create an enclave client enclaveClient := enclaverpc.NewClient(hostConfig, testlog.Logger().New(log.NodeIDKey, n.operatorIdx)) rpcServer := clientrpc.NewServer(hostConfig, n.logger) mgmtContractLib := mgmtcontractlib.NewMgmtContractLib(&hostConfig.ManagementContractAddress, n.logger) - return hostcontainer.NewHostContainer(hostConfig, nodeP2p, n.l1Client, enclaveClient, mgmtContractLib, n.l1Wallet, rpcServer, hostLogger, metrics.New(false, 0, n.logger)) + return hostcontainer.NewHostContainer(hostConfig, svcLocator, nodeP2p, n.l1Client, enclaveClient, mgmtContractLib, n.l1Wallet, rpcServer, hostLogger, metrics.New(false, 0, n.logger)) } func (n *InMemNodeOperator) createEnclaveContainer() *enclavecontainer.EnclaveContainer { diff --git a/integration/simulation/network/network_utils.go b/integration/simulation/network/network_utils.go index 9ebaa14183..3dd382f221 100644 --- a/integration/simulation/network/network_utils.go +++ b/integration/simulation/network/network_utils.go @@ -6,6 +6,8 @@ import ( "math/big" "time" + "github.com/obscuronet/go-obscuro/go/host" + "github.com/obscuronet/go-obscuro/go/common" "github.com/obscuronet/go-obscuro/go/common/log" "github.com/obscuronet/go-obscuro/go/common/metrics" @@ -93,7 +95,7 @@ func createInMemObscuroNode( hostLogger := testlog.Logger().New(log.NodeIDKey, id, log.CmpKey, log.HostCmp) metricsService := metrics.New(hostConfig.MetricsEnabled, hostConfig.MetricsHTTPPort, hostLogger) - currentContainer := container.NewHostContainer(hostConfig, mockP2P, ethClient, enclaveClient, mgmtContractLib, ethWallet, nil, hostLogger, metricsService) + currentContainer := container.NewHostContainer(hostConfig, host.NewServicesRegistry(hostLogger), mockP2P, ethClient, enclaveClient, mgmtContractLib, ethWallet, nil, hostLogger, metricsService) return currentContainer } diff --git a/integration/simulation/p2p/mock_l2_network.go b/integration/simulation/p2p/mock_l2_network.go index b109722b37..b39bf5b3ba 100644 --- a/integration/simulation/p2p/mock_l2_network.go +++ b/integration/simulation/p2p/mock_l2_network.go @@ -184,3 +184,7 @@ func (n *MockP2P) ReceiveBatchRequest(requestID string, fromSeqNo *big.Int) { sub.HandleBatchRequest(requestID, fromSeqNo) } } + +func (n *MockP2P) RefreshPeerList() { + // no-op +} diff --git a/integration/simulation/validate_chain.go b/integration/simulation/validate_chain.go index 181c402c77..23d28fb69c 100644 --- a/integration/simulation/validate_chain.go +++ b/integration/simulation/validate_chain.go @@ -216,7 +216,10 @@ func checkRollups(t *testing.T, s *Simulation, nodeIdx int, rollups []*common.Ex for _, clients := range s.RPCHandles.AuthObsClients { client := clients[0] - batchOnNode, _ := client.BatchHeaderByHash(batchHeader.Hash()) + batchOnNode, err := client.BatchHeaderByHash(batchHeader.Hash()) + if err != nil { + t.Fatalf("Node %d: Could not find batch header [idx=%s, hash=%s]. Cause: %s", nodeIdx, batchHeader.Number, batchHeader.Hash(), err) + } if batchOnNode.Hash() != batchHeader.Hash() { t.Errorf("Node %d: Batches mismatch!", nodeIdx) }