Skip to content

Commit

Permalink
Host: notify separately for new heads and new validated heads (#1904)
Browse files Browse the repository at this point in the history
  • Loading branch information
BedrockSquirrel authored May 9, 2024
1 parent 5db2d1f commit 52579fe
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
10 changes: 8 additions & 2 deletions go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,20 @@ type L1Publisher interface {

// L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical)
type L2BatchRepository interface {
// Subscribe will register a batch handler to receive new batches as they arrive
Subscribe(handler L2BatchHandler) func()
// SubscribeNewBatches will register a handler to receive new batches from the publisher as they arrive at the host
SubscribeNewBatches(handler L2BatchHandler) func()

// SubscribeValidatedBatches will register a handler to receive batches that have been validated by the enclave
SubscribeValidatedBatches(handler L2BatchHandler) func()

FetchBatchBySeqNo(background context.Context, seqNo *big.Int) (*common.ExtBatch, error)

// AddBatch is used to notify the repository of a new batch, e.g. from the enclave when seq produces one or a rollup is consumed
// Note: it is fine to add batches that the repo already has, it will just ignore them
AddBatch(batch *common.ExtBatch) error

// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers
NotifyNewValidatedHead(batch *common.ExtBatch)
}

// L2BatchHandler is an interface for receiving new batches from the publisher as they arrive
Expand Down
6 changes: 4 additions & 2 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (g *Guardian) Start() error {

// note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host
g.sl.L1Repo().Subscribe(g)
g.sl.L2Repo().Subscribe(g)
g.sl.L2Repo().SubscribeNewBatches(g)

// start streaming data from the enclave
go g.streamEnclaveData()
Expand Down Expand Up @@ -659,8 +659,10 @@ func (g *Guardian) streamEnclaveData() {
g.logger.Error("Failed to broadcast batch", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err)
}
} else {
g.logger.Debug("Received batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash())
g.logger.Debug("Received validated batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash())
}
// Notify the L2 repo that an enclave has validated a batch, so it can update its validated head and notify subscribers
g.sl.L2Repo().NotifyNewValidatedHead(resp.Batch)
g.state.OnProcessedBatch(resp.Batch.Header.SequencerOrderNo)
}

Expand Down
2 changes: 1 addition & 1 deletion go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
l2Repo := l2.NewBatchRepository(config, hostServices, hostStorage, logger)
subsService := events.NewLogEventManager(hostServices, logger)

l2Repo.Subscribe(batchListener{newHeads: host.newHeads})
l2Repo.SubscribeValidatedBatches(batchListener{newHeads: host.newHeads})
hostServices.RegisterService(hostcommon.P2PName, p2p)
hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo)
maxWaitForL1Receipt := 6 * config.L1BlockTime // wait ~10 blocks to see if tx gets published before retrying
Expand Down
44 changes: 34 additions & 10 deletions go/host/l2/batchrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type batchRepoServiceLocator interface {
// Repository is responsible for storing and retrieving batches from the database
// If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them.
type Repository struct {
batchSubscribers *subscription.Manager[host.L2BatchHandler]
batchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is added to the repository
validatedBatchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is validated by the enclave

sl batchRepoServiceLocator
storage storage.Storage
Expand All @@ -46,6 +47,10 @@ type Repository struct {
latestBatchSeqNo *big.Int
latestSeqNoMutex sync.Mutex

// high watermark for batch sequence numbers validated by our enclave so far.
latestValidatedSeqNo *big.Int
latestValidatedMutex sync.Mutex

// The repository requests batches from peers asynchronously, we don't want to repeatedly spam out requests if we
// haven't received a response yet, but we also don't want to wait forever if there's no response.
// So we keep track of the last request time and what was requested, using a mutex to avoid concurrent access errors on them
Expand All @@ -59,13 +64,15 @@ type Repository struct {

func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, storage storage.Storage, logger gethlog.Logger) *Repository {
return &Repository{
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
storage: storage,
isSequencer: cfg.NodeType == common.Sequencer,
latestBatchSeqNo: big.NewInt(0),
running: atomic.Bool{},
logger: logger,
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
validatedBatchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
storage: storage,
isSequencer: cfg.NodeType == common.Sequencer,
latestBatchSeqNo: big.NewInt(0),
latestValidatedSeqNo: big.NewInt(0),
running: atomic.Bool{},
logger: logger,
}
}

Expand Down Expand Up @@ -144,11 +151,15 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int)
}
}

// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) Subscribe(handler host.L2BatchHandler) func() {
// SubscribeNewBatches registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) SubscribeNewBatches(handler host.L2BatchHandler) func() {
return r.batchSubscribers.Subscribe(handler)
}

func (r *Repository) SubscribeValidatedBatches(handler host.L2BatchHandler) func() {
return r.validatedBatchSubscribers.Subscribe(handler)
}

func (r *Repository) FetchBatchBySeqNo(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) {
b, err := r.storage.FetchBatchBySeqNo(seqNo.Uint64())
if err != nil {
Expand Down Expand Up @@ -190,6 +201,19 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error {
return nil
}

// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers
func (r *Repository) NotifyNewValidatedHead(batch *common.ExtBatch) {
r.latestValidatedMutex.Lock()
defer r.latestValidatedMutex.Unlock()
if batch.SeqNo().Cmp(r.latestValidatedSeqNo) > 0 {
r.latestValidatedSeqNo = batch.SeqNo()
}
// notify validated batch subscribers, a new batch has been successfully processed by an enclave
for _, subscriber := range r.validatedBatchSubscribers.Subscribers() {
go subscriber.HandleBatch(batch)
}
}

func (r *Repository) fetchBatchFallbackToEnclave(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) {
b, err := r.sl.Enclaves().LookupBatchBySeqNo(ctx, seqNo)
if err != nil {
Expand Down

0 comments on commit 52579fe

Please sign in to comment.