Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host: notify separately for new heads and new validated heads #1904

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,20 @@ type L1Publisher interface {

// L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical)
type L2BatchRepository interface {
// Subscribe will register a batch handler to receive new batches as they arrive
Subscribe(handler L2BatchHandler) func()
// SubscribeNewBatches will register a handler to receive new batches from the publisher as they arrive at the host
SubscribeNewBatches(handler L2BatchHandler) func()

// SubscribeValidatedBatches will register a handler to receive batches that have been validated by the enclave
SubscribeValidatedBatches(handler L2BatchHandler) func()

FetchBatchBySeqNo(background context.Context, seqNo *big.Int) (*common.ExtBatch, error)

// AddBatch is used to notify the repository of a new batch, e.g. from the enclave when seq produces one or a rollup is consumed
// Note: it is fine to add batches that the repo already has, it will just ignore them
AddBatch(batch *common.ExtBatch) error

// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers
NotifyNewValidatedHead(batch *common.ExtBatch)
}

// L2BatchHandler is an interface for receiving new batches from the publisher as they arrive
Expand Down
6 changes: 4 additions & 2 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (g *Guardian) Start() error {

// note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host
g.sl.L1Repo().Subscribe(g)
g.sl.L2Repo().Subscribe(g)
g.sl.L2Repo().SubscribeNewBatches(g)

// start streaming data from the enclave
go g.streamEnclaveData()
Expand Down Expand Up @@ -659,8 +659,10 @@ func (g *Guardian) streamEnclaveData() {
g.logger.Error("Failed to broadcast batch", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err)
}
} else {
g.logger.Debug("Received batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash())
g.logger.Debug("Received validated batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash())
}
// Notify the L2 repo that an enclave has validated a batch, so it can update its validated head and notify subscribers
g.sl.L2Repo().NotifyNewValidatedHead(resp.Batch)
g.state.OnProcessedBatch(resp.Batch.Header.SequencerOrderNo)
}

Expand Down
2 changes: 1 addition & 1 deletion go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
l2Repo := l2.NewBatchRepository(config, hostServices, hostStorage, logger)
subsService := events.NewLogEventManager(hostServices, logger)

l2Repo.Subscribe(batchListener{newHeads: host.newHeads})
l2Repo.SubscribeValidatedBatches(batchListener{newHeads: host.newHeads})
hostServices.RegisterService(hostcommon.P2PName, p2p)
hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo)
maxWaitForL1Receipt := 6 * config.L1BlockTime // wait ~10 blocks to see if tx gets published before retrying
Expand Down
44 changes: 34 additions & 10 deletions go/host/l2/batchrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type batchRepoServiceLocator interface {
// Repository is responsible for storing and retrieving batches from the database
// If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them.
type Repository struct {
batchSubscribers *subscription.Manager[host.L2BatchHandler]
batchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is added to the repository
validatedBatchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is validated by the enclave

sl batchRepoServiceLocator
storage storage.Storage
Expand All @@ -46,6 +47,10 @@ type Repository struct {
latestBatchSeqNo *big.Int
latestSeqNoMutex sync.Mutex

// high watermark for batch sequence numbers validated by our enclave so far.
latestValidatedSeqNo *big.Int
latestValidatedMutex sync.Mutex

// The repository requests batches from peers asynchronously, we don't want to repeatedly spam out requests if we
// haven't received a response yet, but we also don't want to wait forever if there's no response.
// So we keep track of the last request time and what was requested, using a mutex to avoid concurrent access errors on them
Expand All @@ -59,13 +64,15 @@ type Repository struct {

func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, storage storage.Storage, logger gethlog.Logger) *Repository {
return &Repository{
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
storage: storage,
isSequencer: cfg.NodeType == common.Sequencer,
latestBatchSeqNo: big.NewInt(0),
running: atomic.Bool{},
logger: logger,
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
validatedBatchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
storage: storage,
isSequencer: cfg.NodeType == common.Sequencer,
latestBatchSeqNo: big.NewInt(0),
latestValidatedSeqNo: big.NewInt(0),
running: atomic.Bool{},
logger: logger,
}
}

Expand Down Expand Up @@ -144,11 +151,15 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int)
}
}

// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) Subscribe(handler host.L2BatchHandler) func() {
// SubscribeNewBatches registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) SubscribeNewBatches(handler host.L2BatchHandler) func() {
return r.batchSubscribers.Subscribe(handler)
}

func (r *Repository) SubscribeValidatedBatches(handler host.L2BatchHandler) func() {
return r.validatedBatchSubscribers.Subscribe(handler)
}

func (r *Repository) FetchBatchBySeqNo(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) {
b, err := r.storage.FetchBatchBySeqNo(seqNo.Uint64())
if err != nil {
Expand Down Expand Up @@ -190,6 +201,19 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error {
return nil
}

// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers
func (r *Repository) NotifyNewValidatedHead(batch *common.ExtBatch) {
r.latestValidatedMutex.Lock()
defer r.latestValidatedMutex.Unlock()
if batch.SeqNo().Cmp(r.latestValidatedSeqNo) > 0 {
r.latestValidatedSeqNo = batch.SeqNo()
}
// notify validated batch subscribers, a new batch has been successfully processed by an enclave
for _, subscriber := range r.validatedBatchSubscribers.Subscribers() {
go subscriber.HandleBatch(batch)
}
}

func (r *Repository) fetchBatchFallbackToEnclave(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) {
b, err := r.sl.Enclaves().LookupBatchBySeqNo(ctx, seqNo)
if err != nil {
Expand Down
Loading