Skip to content

Commit

Permalink
Host: notify separately for new heads and new validated heads
Browse files Browse the repository at this point in the history
  • Loading branch information
BedrockSquirrel committed May 9, 2024
1 parent 01d39df commit ccd026b
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 27 deletions.
10 changes: 8 additions & 2 deletions go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,20 @@ type L1Publisher interface {

// L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical)
type L2BatchRepository interface {
// Subscribe will register a batch handler to receive new batches as they arrive
Subscribe(handler L2BatchHandler) func()
// SubscribeNewBatches will register a handler to receive new batches from the publisher as they arrive
SubscribeNewBatches(handler L2BatchHandler) func()

// SubscribeValidatedBatches will register a handler to receive batches that have been validated by the enclave
SubscribeValidatedBatches(handler L2BatchHandler) func()

FetchBatchBySeqNo(background context.Context, seqNo *big.Int) (*common.ExtBatch, error)

// AddBatch is used to notify the repository of a new batch, e.g. from the enclave when seq produces one or a rollup is consumed
// Note: it is fine to add batches that the repo already has, it will just ignore them
AddBatch(batch *common.ExtBatch) error

// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers
NotifyNewValidatedHead(batch *common.ExtBatch)
}

// L2BatchHandler is an interface for receiving new batches from the publisher as they arrive
Expand Down
4 changes: 2 additions & 2 deletions go/common/rpc/generated/enclave.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions go/common/rpc/generated/enclave_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (g *Guardian) Start() error {

// note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host
g.sl.L1Repo().Subscribe(g)
g.sl.L2Repo().Subscribe(g)
g.sl.L2Repo().SubscribeNewBatches(g)

// start streaming data from the enclave
go g.streamEnclaveData()
Expand Down Expand Up @@ -659,8 +659,10 @@ func (g *Guardian) streamEnclaveData() {
g.logger.Error("Failed to broadcast batch", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err)
}
} else {
g.logger.Debug("Received batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash())
g.logger.Debug("Received validated batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash())
}
// Notify the L2 repo that an enclave has validated a batch, so it can update its validated head and notify subscribers
g.sl.L2Repo().NotifyNewValidatedHead(resp.Batch)
g.state.OnProcessedBatch(resp.Batch.Header.SequencerOrderNo)
}

Expand Down
6 changes: 3 additions & 3 deletions go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
l2Repo := l2.NewBatchRepository(config, hostServices, hostStorage, logger)
subsService := events.NewLogEventManager(hostServices, logger)

l2Repo.Subscribe(batchListener{newHeads: host.newHeads})
l2Repo.SubscribeValidatedBatches(batchListener{newHeads: host.newHeads})
hostServices.RegisterService(hostcommon.P2PName, p2p)
hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo)
maxWaitForL1Receipt := 6 * config.L1BlockTime // wait ~10 blocks to see if tx gets published before retrying
Expand Down Expand Up @@ -165,14 +165,14 @@ func (h *host) SubmitAndBroadcastTx(ctx context.Context, encryptedParams common.

func (h *host) SubscribeLogs(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error {
if h.stopControl.IsStopping() {
return responses.ToInternalError(fmt.Errorf("requested Subscribe with the host stopping"))
return responses.ToInternalError(fmt.Errorf("requested SubscribeNewBatches with the host stopping"))
}
return h.services.LogSubs().Subscribe(id, encryptedLogSubscription, matchedLogsCh)
}

func (h *host) UnsubscribeLogs(id rpc.ID) {
if h.stopControl.IsStopping() {
h.logger.Debug("requested Subscribe with the host stopping")
h.logger.Debug("requested SubscribeNewBatches with the host stopping")
}
h.services.LogSubs().Unsubscribe(id)
}
Expand Down
44 changes: 34 additions & 10 deletions go/host/l2/batchrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type batchRepoServiceLocator interface {
// Repository is responsible for storing and retrieving batches from the database
// If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them.
type Repository struct {
batchSubscribers *subscription.Manager[host.L2BatchHandler]
batchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is added to the repository
validatedBatchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is validated by the enclave

sl batchRepoServiceLocator
storage storage.Storage
Expand All @@ -46,6 +47,10 @@ type Repository struct {
latestBatchSeqNo *big.Int
latestSeqNoMutex sync.Mutex

// high watermark for batch sequence numbers validated by our enclave so far.
latestValidatedSeqNo *big.Int
latestValidatedMutex sync.Mutex

// The repository requests batches from peers asynchronously, we don't want to repeatedly spam out requests if we
// haven't received a response yet, but we also don't want to wait forever if there's no response.
// So we keep track of the last request time and what was requested, using a mutex to avoid concurrent access errors on them
Expand All @@ -59,13 +64,15 @@ type Repository struct {

func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, storage storage.Storage, logger gethlog.Logger) *Repository {
return &Repository{
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
storage: storage,
isSequencer: cfg.NodeType == common.Sequencer,
latestBatchSeqNo: big.NewInt(0),
running: atomic.Bool{},
logger: logger,
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
validatedBatchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
storage: storage,
isSequencer: cfg.NodeType == common.Sequencer,
latestBatchSeqNo: big.NewInt(0),
latestValidatedSeqNo: big.NewInt(0),
running: atomic.Bool{},
logger: logger,
}
}

Expand Down Expand Up @@ -144,11 +151,15 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int)
}
}

// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) Subscribe(handler host.L2BatchHandler) func() {
// SubscribeNewBatches registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) SubscribeNewBatches(handler host.L2BatchHandler) func() {
return r.batchSubscribers.Subscribe(handler)
}

func (r *Repository) SubscribeValidatedBatches(handler host.L2BatchHandler) func() {
return r.validatedBatchSubscribers.Subscribe(handler)
}

func (r *Repository) FetchBatchBySeqNo(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) {
b, err := r.storage.FetchBatchBySeqNo(seqNo.Uint64())
if err != nil {
Expand Down Expand Up @@ -190,6 +201,19 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error {
return nil
}

// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers
func (r *Repository) NotifyNewValidatedHead(batch *common.ExtBatch) {
r.latestValidatedMutex.Lock()
defer r.latestValidatedMutex.Unlock()
if batch.SeqNo().Cmp(r.latestValidatedSeqNo) > 0 {
r.latestValidatedSeqNo = batch.SeqNo()
}
// notify validated batch subscribers, a new batch has been successfully processed by an enclave
for _, subscriber := range r.validatedBatchSubscribers.Subscribers() {
go subscriber.HandleBatch(batch)
}
}

func (r *Repository) fetchBatchFallbackToEnclave(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) {
b, err := r.sl.Enclaves().LookupBatchBySeqNo(ctx, seqNo)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions lib/gethfork/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type requestOp struct {
ids []json.RawMessage
err error
resp chan []*jsonrpcMessage // the response goes here
sub *ClientSubscription // set for Subscribe requests.
sub *ClientSubscription // set for SubscribeNewBatches requests.
hadResponse bool // true when the request was responded to
}

Expand Down Expand Up @@ -515,10 +515,10 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic(fmt.Sprintf("channel argument of Subscribe has type %T, need writable channel", channel))
panic(fmt.Sprintf("channel argument of SubscribeNewBatches has type %T, need writable channel", channel))
}
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
panic("channel given to SubscribeNewBatches must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
Expand Down
2 changes: 1 addition & 1 deletion lib/gethfork/rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (e notificationsUnsupportedError) ErrorCode() int { return -32601 }
// -32601 (method not found) are equivalent to notificationsUnsupportedError. This is
// done to enable the following pattern:
//
// sub, err := client.Subscribe(...)
// sub, err := client.SubscribeNewBatches(...)
// if errors.Is(err, rpc.ErrNotificationsUnsupported) {
// // server doesn't support subscriptions
// }
Expand Down
2 changes: 1 addition & 1 deletion lib/gethfork/rpc/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.ID)
}

// ClientSubscription is a subscription established through the Client's Subscribe or
// ClientSubscription is a subscription established through the Client's SubscribeNewBatches or
// EthSubscribe methods.
type ClientSubscription struct {
client *Client
Expand Down

0 comments on commit ccd026b

Please sign in to comment.