diff --git a/go/common/host/services.go b/go/common/host/services.go index 8b76acf2f3..d9e461b132 100644 --- a/go/common/host/services.go +++ b/go/common/host/services.go @@ -118,14 +118,20 @@ type L1Publisher interface { // L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical) type L2BatchRepository interface { - // Subscribe will register a batch handler to receive new batches as they arrive - Subscribe(handler L2BatchHandler) func() + // SubscribeNewBatches will register a handler to receive new batches from the publisher as they arrive + SubscribeNewBatches(handler L2BatchHandler) func() + + // SubscribeValidatedBatches will register a handler to receive batches that have been validated by the enclave + SubscribeValidatedBatches(handler L2BatchHandler) func() FetchBatchBySeqNo(background context.Context, seqNo *big.Int) (*common.ExtBatch, error) // AddBatch is used to notify the repository of a new batch, e.g. from the enclave when seq produces one or a rollup is consumed // Note: it is fine to add batches that the repo already has, it will just ignore them AddBatch(batch *common.ExtBatch) error + + // NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers + NotifyNewValidatedHead(batch *common.ExtBatch) } // L2BatchHandler is an interface for receiving new batches from the publisher as they arrive diff --git a/go/common/rpc/generated/enclave.pb.go b/go/common/rpc/generated/enclave.pb.go index 11a4e05c49..cbe08ca427 100644 --- a/go/common/rpc/generated/enclave.pb.go +++ b/go/common/rpc/generated/enclave.pb.go @@ -4942,7 +4942,7 @@ var file_enclave_proto_depIdxs = []int32{ 50, // 54: generated.EnclaveProto.GetTransactionReceipt:input_type -> generated.GetTransactionReceiptRequest 52, // 55: generated.EnclaveProto.GetBalance:input_type -> generated.GetBalanceRequest 54, // 56: generated.EnclaveProto.GetCode:input_type -> generated.GetCodeRequest - 56, // 57: generated.EnclaveProto.Subscribe:input_type -> generated.SubscribeRequest + 56, // 57: generated.EnclaveProto.SubscribeNewBatches:input_type -> generated.SubscribeRequest 58, // 58: generated.EnclaveProto.Unsubscribe:input_type -> generated.UnsubscribeRequest 60, // 59: generated.EnclaveProto.EstimateGas:input_type -> generated.EstimateGasRequest 62, // 60: generated.EnclaveProto.GetLogs:input_type -> generated.GetLogsRequest @@ -4973,7 +4973,7 @@ var file_enclave_proto_depIdxs = []int32{ 51, // 85: generated.EnclaveProto.GetTransactionReceipt:output_type -> generated.GetTransactionReceiptResponse 53, // 86: generated.EnclaveProto.GetBalance:output_type -> generated.GetBalanceResponse 55, // 87: generated.EnclaveProto.GetCode:output_type -> generated.GetCodeResponse - 57, // 88: generated.EnclaveProto.Subscribe:output_type -> generated.SubscribeResponse + 57, // 88: generated.EnclaveProto.SubscribeNewBatches:output_type -> generated.SubscribeResponse 59, // 89: generated.EnclaveProto.Unsubscribe:output_type -> generated.UnsubscribeResponse 61, // 90: generated.EnclaveProto.EstimateGas:output_type -> generated.EstimateGasResponse 63, // 91: generated.EnclaveProto.GetLogs:output_type -> generated.GetLogsResponse diff --git a/go/common/rpc/generated/enclave_grpc.pb.go b/go/common/rpc/generated/enclave_grpc.pb.go index ceb3485c87..229e8b0b94 100644 --- a/go/common/rpc/generated/enclave_grpc.pb.go +++ b/go/common/rpc/generated/enclave_grpc.pb.go @@ -34,7 +34,7 @@ const ( EnclaveProto_GetTransactionReceipt_FullMethodName = "/generated.EnclaveProto/GetTransactionReceipt" EnclaveProto_GetBalance_FullMethodName = "/generated.EnclaveProto/GetBalance" EnclaveProto_GetCode_FullMethodName = "/generated.EnclaveProto/GetCode" - EnclaveProto_Subscribe_FullMethodName = "/generated.EnclaveProto/Subscribe" + EnclaveProto_Subscribe_FullMethodName = "/generated.EnclaveProto/SubscribeNewBatches" EnclaveProto_Unsubscribe_FullMethodName = "/generated.EnclaveProto/Unsubscribe" EnclaveProto_EstimateGas_FullMethodName = "/generated.EnclaveProto/EstimateGas" EnclaveProto_GetLogs_FullMethodName = "/generated.EnclaveProto/GetLogs" @@ -549,7 +549,7 @@ func (UnimplementedEnclaveProtoServer) GetCode(context.Context, *GetCodeRequest) return nil, status.Errorf(codes.Unimplemented, "method GetCode not implemented") } func (UnimplementedEnclaveProtoServer) Subscribe(context.Context, *SubscribeRequest) (*SubscribeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Subscribe not implemented") + return nil, status.Errorf(codes.Unimplemented, "method SubscribeNewBatches not implemented") } func (UnimplementedEnclaveProtoServer) Unsubscribe(context.Context, *UnsubscribeRequest) (*UnsubscribeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Unsubscribe not implemented") @@ -1238,7 +1238,7 @@ var EnclaveProto_ServiceDesc = grpc.ServiceDesc{ Handler: _EnclaveProto_GetCode_Handler, }, { - MethodName: "Subscribe", + MethodName: "SubscribeNewBatches", Handler: _EnclaveProto_Subscribe_Handler, }, { diff --git a/go/host/enclave/guardian.go b/go/host/enclave/guardian.go index 83fa49894e..2a4374a869 100644 --- a/go/host/enclave/guardian.go +++ b/go/host/enclave/guardian.go @@ -128,7 +128,7 @@ func (g *Guardian) Start() error { // note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host g.sl.L1Repo().Subscribe(g) - g.sl.L2Repo().Subscribe(g) + g.sl.L2Repo().SubscribeNewBatches(g) // start streaming data from the enclave go g.streamEnclaveData() @@ -659,8 +659,10 @@ func (g *Guardian) streamEnclaveData() { g.logger.Error("Failed to broadcast batch", log.BatchHashKey, resp.Batch.Hash(), log.ErrKey, err) } } else { - g.logger.Debug("Received batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash()) + g.logger.Debug("Received validated batch from enclave", log.BatchSeqNoKey, resp.Batch.Header.SequencerOrderNo, log.BatchHashKey, resp.Batch.Hash()) } + // Notify the L2 repo that an enclave has validated a batch, so it can update its validated head and notify subscribers + g.sl.L2Repo().NotifyNewValidatedHead(resp.Batch) g.state.OnProcessedBatch(resp.Batch.Header.SequencerOrderNo) } diff --git a/go/host/host.go b/go/host/host.go index 14b83e5c37..7de4c8bccd 100644 --- a/go/host/host.go +++ b/go/host/host.go @@ -97,7 +97,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host l2Repo := l2.NewBatchRepository(config, hostServices, hostStorage, logger) subsService := events.NewLogEventManager(hostServices, logger) - l2Repo.Subscribe(batchListener{newHeads: host.newHeads}) + l2Repo.SubscribeValidatedBatches(batchListener{newHeads: host.newHeads}) hostServices.RegisterService(hostcommon.P2PName, p2p) hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo) maxWaitForL1Receipt := 6 * config.L1BlockTime // wait ~10 blocks to see if tx gets published before retrying @@ -165,14 +165,14 @@ func (h *host) SubmitAndBroadcastTx(ctx context.Context, encryptedParams common. func (h *host) SubscribeLogs(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error { if h.stopControl.IsStopping() { - return responses.ToInternalError(fmt.Errorf("requested Subscribe with the host stopping")) + return responses.ToInternalError(fmt.Errorf("requested SubscribeNewBatches with the host stopping")) } return h.services.LogSubs().Subscribe(id, encryptedLogSubscription, matchedLogsCh) } func (h *host) UnsubscribeLogs(id rpc.ID) { if h.stopControl.IsStopping() { - h.logger.Debug("requested Subscribe with the host stopping") + h.logger.Debug("requested SubscribeNewBatches with the host stopping") } h.services.LogSubs().Unsubscribe(id) } diff --git a/go/host/l2/batchrepository.go b/go/host/l2/batchrepository.go index 27ed9fff7a..4d89f01fa5 100644 --- a/go/host/l2/batchrepository.go +++ b/go/host/l2/batchrepository.go @@ -36,7 +36,8 @@ type batchRepoServiceLocator interface { // Repository is responsible for storing and retrieving batches from the database // If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them. type Repository struct { - batchSubscribers *subscription.Manager[host.L2BatchHandler] + batchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is added to the repository + validatedBatchSubscribers *subscription.Manager[host.L2BatchHandler] // notified when a new batch is validated by the enclave sl batchRepoServiceLocator storage storage.Storage @@ -46,6 +47,10 @@ type Repository struct { latestBatchSeqNo *big.Int latestSeqNoMutex sync.Mutex + // high watermark for batch sequence numbers validated by our enclave so far. + latestValidatedSeqNo *big.Int + latestValidatedMutex sync.Mutex + // The repository requests batches from peers asynchronously, we don't want to repeatedly spam out requests if we // haven't received a response yet, but we also don't want to wait forever if there's no response. // So we keep track of the last request time and what was requested, using a mutex to avoid concurrent access errors on them @@ -59,13 +64,15 @@ type Repository struct { func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, storage storage.Storage, logger gethlog.Logger) *Repository { return &Repository{ - batchSubscribers: subscription.NewManager[host.L2BatchHandler](), - sl: hostService, - storage: storage, - isSequencer: cfg.NodeType == common.Sequencer, - latestBatchSeqNo: big.NewInt(0), - running: atomic.Bool{}, - logger: logger, + batchSubscribers: subscription.NewManager[host.L2BatchHandler](), + validatedBatchSubscribers: subscription.NewManager[host.L2BatchHandler](), + sl: hostService, + storage: storage, + isSequencer: cfg.NodeType == common.Sequencer, + latestBatchSeqNo: big.NewInt(0), + latestValidatedSeqNo: big.NewInt(0), + running: atomic.Bool{}, + logger: logger, } } @@ -144,11 +151,15 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int) } } -// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func -func (r *Repository) Subscribe(handler host.L2BatchHandler) func() { +// SubscribeNewBatches registers a handler to be notified of new head batches as they arrive, returns unsubscribe func +func (r *Repository) SubscribeNewBatches(handler host.L2BatchHandler) func() { return r.batchSubscribers.Subscribe(handler) } +func (r *Repository) SubscribeValidatedBatches(handler host.L2BatchHandler) func() { + return r.validatedBatchSubscribers.Subscribe(handler) +} + func (r *Repository) FetchBatchBySeqNo(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) { b, err := r.storage.FetchBatchBySeqNo(seqNo.Uint64()) if err != nil { @@ -190,6 +201,19 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error { return nil } +// NotifyNewValidatedHead - called after an enclave validates a batch, to update the repo's validated head and notify subscribers +func (r *Repository) NotifyNewValidatedHead(batch *common.ExtBatch) { + r.latestValidatedMutex.Lock() + defer r.latestValidatedMutex.Unlock() + if batch.SeqNo().Cmp(r.latestValidatedSeqNo) > 0 { + r.latestValidatedSeqNo = batch.SeqNo() + } + // notify validated batch subscribers, a new batch has been successfully processed by an enclave + for _, subscriber := range r.validatedBatchSubscribers.Subscribers() { + go subscriber.HandleBatch(batch) + } +} + func (r *Repository) fetchBatchFallbackToEnclave(ctx context.Context, seqNo *big.Int) (*common.ExtBatch, error) { b, err := r.sl.Enclaves().LookupBatchBySeqNo(ctx, seqNo) if err != nil { diff --git a/lib/gethfork/rpc/client.go b/lib/gethfork/rpc/client.go index 232b74fbdd..0605201888 100644 --- a/lib/gethfork/rpc/client.go +++ b/lib/gethfork/rpc/client.go @@ -145,7 +145,7 @@ type requestOp struct { ids []json.RawMessage err error resp chan []*jsonrpcMessage // the response goes here - sub *ClientSubscription // set for Subscribe requests. + sub *ClientSubscription // set for SubscribeNewBatches requests. hadResponse bool // true when the request was responded to } @@ -515,10 +515,10 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf // Check type of channel first. chanVal := reflect.ValueOf(channel) if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic(fmt.Sprintf("channel argument of Subscribe has type %T, need writable channel", channel)) + panic(fmt.Sprintf("channel argument of SubscribeNewBatches has type %T, need writable channel", channel)) } if chanVal.IsNil() { - panic("channel given to Subscribe must not be nil") + panic("channel given to SubscribeNewBatches must not be nil") } if c.isHTTP { return nil, ErrNotificationsUnsupported diff --git a/lib/gethfork/rpc/errors.go b/lib/gethfork/rpc/errors.go index 438aff218c..c8b2a73a13 100644 --- a/lib/gethfork/rpc/errors.go +++ b/lib/gethfork/rpc/errors.go @@ -93,7 +93,7 @@ func (e notificationsUnsupportedError) ErrorCode() int { return -32601 } // -32601 (method not found) are equivalent to notificationsUnsupportedError. This is // done to enable the following pattern: // -// sub, err := client.Subscribe(...) +// sub, err := client.SubscribeNewBatches(...) // if errors.Is(err, rpc.ErrNotificationsUnsupported) { // // server doesn't support subscriptions // } diff --git a/lib/gethfork/rpc/subscription.go b/lib/gethfork/rpc/subscription.go index ca432190fb..d4177e3e54 100644 --- a/lib/gethfork/rpc/subscription.go +++ b/lib/gethfork/rpc/subscription.go @@ -212,7 +212,7 @@ func (s *Subscription) MarshalJSON() ([]byte, error) { return json.Marshal(s.ID) } -// ClientSubscription is a subscription established through the Client's Subscribe or +// ClientSubscription is a subscription established through the Client's SubscribeNewBatches or // EthSubscribe methods. type ClientSubscription struct { client *Client