From fd68eeaf77d63710a986c9fd790b0681b665a3ec Mon Sep 17 00:00:00 2001 From: Matt Curtis Date: Wed, 3 Apr 2024 11:32:19 +0100 Subject: [PATCH] Fix L2 repo to use the subscribers pattern like L1 repo --- go/common/host/services.go | 2 +- go/host/enclave/guardian.go | 2 ++ go/host/l2/batchrepository.go | 12 +++++++----- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/go/common/host/services.go b/go/common/host/services.go index d40b236025..cfdfced78e 100644 --- a/go/common/host/services.go +++ b/go/common/host/services.go @@ -122,7 +122,7 @@ type L1Publisher interface { // L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical) type L2BatchRepository interface { // Subscribe will register a batch handler to receive new batches as they arrive - Subscribe(handler L2BatchHandler) + Subscribe(handler L2BatchHandler) func() FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) diff --git a/go/host/enclave/guardian.go b/go/host/enclave/guardian.go index d4d681ca50..fa2a0975bd 100644 --- a/go/host/enclave/guardian.go +++ b/go/host/enclave/guardian.go @@ -123,6 +123,8 @@ func (g *Guardian) Start() error { // subscribe for L1 and P2P data g.sl.P2P().SubscribeForTx(g) + + // note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host g.sl.L1Repo().Subscribe(g) g.sl.L2Repo().Subscribe(g) diff --git a/go/host/l2/batchrepository.go b/go/host/l2/batchrepository.go index 7fbd46f556..c6a7daa077 100644 --- a/go/host/l2/batchrepository.go +++ b/go/host/l2/batchrepository.go @@ -12,6 +12,7 @@ import ( "github.com/ten-protocol/go-ten/go/common/errutil" "github.com/ten-protocol/go-ten/go/common/host" "github.com/ten-protocol/go-ten/go/common/log" + "github.com/ten-protocol/go-ten/go/common/subscription" "github.com/ten-protocol/go-ten/go/config" "github.com/ten-protocol/go-ten/go/host/db" ) @@ -33,7 +34,7 @@ type batchRepoServiceLocator interface { // Repository is responsible for storing and retrieving batches from the database // If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them. type Repository struct { - subscribers []host.L2BatchHandler + batchSubscribers *subscription.Manager[host.L2BatchHandler] sl batchRepoServiceLocator db *db.DB @@ -56,6 +57,7 @@ type Repository struct { func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, database *db.DB, logger gethlog.Logger) *Repository { return &Repository{ + batchSubscribers: subscription.NewManager[host.L2BatchHandler](), sl: hostService, db: database, isSequencer: cfg.NodeType == common.Sequencer, @@ -140,9 +142,9 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int) } } -// Subscribe registers a handler to be notified of new head batches as they arrive -func (r *Repository) Subscribe(subscriber host.L2BatchHandler) { - r.subscribers = append(r.subscribers, subscriber) +// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func +func (r *Repository) Subscribe(handler host.L2BatchHandler) func() { + return r.batchSubscribers.Subscribe(handler) } func (r *Repository) FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) { @@ -180,7 +182,7 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error { if batch.Header.SequencerOrderNo.Cmp(r.latestBatchSeqNo) > 0 { r.latestBatchSeqNo = batch.Header.SequencerOrderNo // notify subscribers, a new batch has been successfully added to the db - for _, subscriber := range r.subscribers { + for _, subscriber := range r.batchSubscribers.Subscribers() { go subscriber.HandleBatch(batch) } }