Skip to content

Commit

Permalink
Fix L2 repo to use the subscribers pattern like L1 repo
Browse files Browse the repository at this point in the history
  • Loading branch information
BedrockSquirrel committed Apr 3, 2024
1 parent ef55e3d commit fd68eea
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type L1Publisher interface {
// L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical)
type L2BatchRepository interface {
// Subscribe will register a batch handler to receive new batches as they arrive
Subscribe(handler L2BatchHandler)
Subscribe(handler L2BatchHandler) func()

FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error)

Expand Down
2 changes: 2 additions & 0 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (g *Guardian) Start() error {

// subscribe for L1 and P2P data
g.sl.P2P().SubscribeForTx(g)

// note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host
g.sl.L1Repo().Subscribe(g)
g.sl.L2Repo().Subscribe(g)

Expand Down
12 changes: 7 additions & 5 deletions go/host/l2/batchrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/host"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/subscription"
"github.com/ten-protocol/go-ten/go/config"
"github.com/ten-protocol/go-ten/go/host/db"
)
Expand All @@ -33,7 +34,7 @@ type batchRepoServiceLocator interface {
// Repository is responsible for storing and retrieving batches from the database
// If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them.
type Repository struct {
subscribers []host.L2BatchHandler
batchSubscribers *subscription.Manager[host.L2BatchHandler]

sl batchRepoServiceLocator
db *db.DB
Expand All @@ -56,6 +57,7 @@ type Repository struct {

func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, database *db.DB, logger gethlog.Logger) *Repository {
return &Repository{
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
db: database,
isSequencer: cfg.NodeType == common.Sequencer,
Expand Down Expand Up @@ -140,9 +142,9 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int)
}
}

// Subscribe registers a handler to be notified of new head batches as they arrive
func (r *Repository) Subscribe(subscriber host.L2BatchHandler) {
r.subscribers = append(r.subscribers, subscriber)
// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) Subscribe(handler host.L2BatchHandler) func() {
return r.batchSubscribers.Subscribe(handler)
}

func (r *Repository) FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) {
Expand Down Expand Up @@ -180,7 +182,7 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error {
if batch.Header.SequencerOrderNo.Cmp(r.latestBatchSeqNo) > 0 {
r.latestBatchSeqNo = batch.Header.SequencerOrderNo
// notify subscribers, a new batch has been successfully added to the db
for _, subscriber := range r.subscribers {
for _, subscriber := range r.batchSubscribers.Subscribers() {
go subscriber.HandleBatch(batch)
}
}
Expand Down

0 comments on commit fd68eea

Please sign in to comment.