diff --git a/go/common/host/services.go b/go/common/host/services.go index 8b76acf2f3..314331efc5 100644 --- a/go/common/host/services.go +++ b/go/common/host/services.go @@ -118,14 +118,20 @@ type L1Publisher interface { // L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical) type L2BatchRepository interface { - // Subscribe will register a batch handler to receive new batches as they arrive - Subscribe(handler L2BatchHandler) func() + // SubscribeNewBatches will register a handler to receive new batches from the publisher as they arrive at the host + SubscribeNewBatches(handler L2BatchHandler) func() + + // SubscribeValidatedBatches will register a handler to receive batches that have been validated by the enclave + SubscribeValidatedBatches(handler L2BatchHandler) func() FetchBatchBySeqNo(background context.Context, seqNo *big.Int) (*common.ExtBatch, error) // AddBatch is used to notify the repository of a new batch, e.g. from the enclave when seq produces one or a rollup is consumed // Note: it is fine to add batches that the repo already has, it will just ignore them AddBatch(batch *common.ExtBatch) error + + // NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers + NotifyNewValidatedHead(batch *common.ExtBatch) } // L2BatchHandler is an interface for receiving new batches from the publisher as they arrive diff --git a/go/host/enclave/guardian.go b/go/host/enclave/guardian.go index 83fa49894e..2a4374a869 100644 --- a/go/host/enclave/guardian.go +++ b/go/host/enclave/guardian.go @@ -128,7 +128,7 @@ func (g *Guardian) Start() error { // note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host g.sl.L1Repo().Subscribe(g) - g.sl.L2Repo().Subscribe(g) + g.sl.L2Repo().SubscribeNewBatches(g) // start streaming data from the enclave go g.streamEnclaveData() @@ -659,8 +659,10 @@ func (g *Guardian) streamEnclaveData() { g.logger.Error("Failed to broadcast batch", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err) } } else { - g.logger.Debug("Received batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash()) + g.logger.Debug("Received validated batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash()) } + // Notify the L2 repo that an enclave has validated a batch, so it can update its validated head and notify subscribers + g.sl.L2Repo().NotifyNewValidatedHead(resp.Batch) g.state.OnProcessedBatch(resp.Batch.Header.SequencerOrderNo) } diff --git a/go/host/host.go b/go/host/host.go index 14b83e5c37..13ed1fe5ed 100644 --- a/go/host/host.go +++ b/go/host/host.go @@ -97,7 +97,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host l2Repo := l2.NewBatchRepository(config, hostServices, hostStorage, logger) subsService := events.NewLogEventManager(hostServices, logger) - l2Repo.Subscribe(batchListener{newHeads: host.newHeads}) + l2Repo.SubscribeValidatedBatches(batchListener{newHeads: host.newHeads}) hostServices.RegisterService(hostcommon.P2PName, p2p) hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo) maxWaitForL1Receipt := 6 * config.L1BlockTime // wait ~10 blocks to see if tx gets published before retrying diff --git a/go/host/l2/batchrepository.go b/go/host/l2/batchrepository.go index 27ed9fff7a..4d89f01fa5 100644 --- a/go/host/l2/batchrepository.go +++ b/go/host/l2/batchrepository.go @@ -36,7 +36,8 @@ type batchRepoServiceLocator interface { // Repository is responsible for storing and retrieving batches from the database // If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them. type Repository struct { - batchSubscribers *subscription.Manager[host.L2BatchHandler] + batchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is added to the repository + validatedBatchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is validated by the enclave sl batchRepoServiceLocator storage storage.Storage @@ -46,6 +47,10 @@ type Repository struct { latestBatchSeqNo *big.Int latestSeqNoMutex sync.Mutex + // high watermark for batch sequence numbers validated by our enclave so far. + latestValidatedSeqNo *big.Int + latestValidatedMutex sync.Mutex + // The repository requests batches from peers asynchronously, we don't want to repeatedly spam out requests if we // haven't received a response yet, but we also don't want to wait forever if there's no response. // So we keep track of the last request time and what was requested, using a mutex to avoid concurrent access errors on them @@ -59,13 +64,15 @@ type Repository struct { func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, storage storage.Storage, logger gethlog.Logger) *Repository { return &Repository{ - batchSubscribers: subscription.NewManager[host.L2BatchHandler](), - sl: hostService, - storage: storage, - isSequencer: cfg.NodeType == common.Sequencer, - latestBatchSeqNo: big.NewInt(0), - running: atomic.Bool{}, - logger: logger, + batchSubscribers: subscription.NewManager[host.L2BatchHandler](), + validatedBatchSubscribers: subscription.NewManager[host.L2BatchHandler](), + sl: hostService, + storage: storage, + isSequencer: cfg.NodeType == common.Sequencer, + latestBatchSeqNo: big.NewInt(0), + latestValidatedSeqNo: big.NewInt(0), + running: atomic.Bool{}, + logger: logger, } } @@ -144,11 +151,15 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int) } } -// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func -func (r *Repository) Subscribe(handler host.L2BatchHandler) func() { +// SubscribeNewBatches registers a handler to be notified of new head batches as they arrive, returns unsubscribe func +func (r *Repository) SubscribeNewBatches(handler host.L2BatchHandler) func() { return r.batchSubscribers.Subscribe(handler) } +func (r *Repository) SubscribeValidatedBatches(handler host.L2BatchHandler) func() { + return r.validatedBatchSubscribers.Subscribe(handler) +} + func (r *Repository) FetchBatchBySeqNo(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) { b, err := r.storage.FetchBatchBySeqNo(seqNo.Uint64()) if err != nil { @@ -190,6 +201,19 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error { return nil } +// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers +func (r *Repository) NotifyNewValidatedHead(batch *common.ExtBatch) { + r.latestValidatedMutex.Lock() + defer r.latestValidatedMutex.Unlock() + if batch.SeqNo().Cmp(r.latestValidatedSeqNo) > 0 { + r.latestValidatedSeqNo = batch.SeqNo() + } + // notify validated batch subscribers, a new batch has been successfully processed by an enclave + for _, subscriber := range r.validatedBatchSubscribers.Subscribers() { + go subscriber.HandleBatch(batch) + } +} + func (r *Repository) fetchBatchFallbackToEnclave(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) { b, err := r.sl.Enclaves().LookupBatchBySeqNo(ctx, seqNo) if err != nil {