Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host: Fix L2 repo to use the subscriptions utility #1863

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type L1Publisher interface {
// L2BatchRepository provides an interface for the host to request L2 batch data (live-streaming and historical)
type L2BatchRepository interface {
// Subscribe will register a batch handler to receive new batches as they arrive
Subscribe(handler L2BatchHandler)
Subscribe(handler L2BatchHandler) func()

FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error)

Expand Down
2 changes: 2 additions & 0 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (g *Guardian) Start() error {

// subscribe for L1 and P2P data
g.sl.P2P().SubscribeForTx(g)

// note: not keeping the unsubscribe functions because the lifespan of the guardian is the same as the host
g.sl.L1Repo().Subscribe(g)
g.sl.L2Repo().Subscribe(g)

Expand Down
12 changes: 7 additions & 5 deletions go/host/l2/batchrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/host"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/subscription"
"github.com/ten-protocol/go-ten/go/config"
"github.com/ten-protocol/go-ten/go/host/db"
)
Expand All @@ -33,7 +34,7 @@ type batchRepoServiceLocator interface {
// Repository is responsible for storing and retrieving batches from the database
// If it can't find a batch it will request it from peers. It also subscribes for batch requests from peers and responds to them.
type Repository struct {
subscribers []host.L2BatchHandler
batchSubscribers *subscription.Manager[host.L2BatchHandler]

sl batchRepoServiceLocator
db *db.DB
Expand All @@ -56,6 +57,7 @@ type Repository struct {

func NewBatchRepository(cfg *config.HostConfig, hostService batchRepoServiceLocator, database *db.DB, logger gethlog.Logger) *Repository {
return &Repository{
batchSubscribers: subscription.NewManager[host.L2BatchHandler](),
sl: hostService,
db: database,
isSequencer: cfg.NodeType == common.Sequencer,
Expand Down Expand Up @@ -140,9 +142,9 @@ func (r *Repository) HandleBatchRequest(requesterID string, fromSeqNo *big.Int)
}
}

// Subscribe registers a handler to be notified of new head batches as they arrive
func (r *Repository) Subscribe(subscriber host.L2BatchHandler) {
r.subscribers = append(r.subscribers, subscriber)
// Subscribe registers a handler to be notified of new head batches as they arrive, returns unsubscribe func
func (r *Repository) Subscribe(handler host.L2BatchHandler) func() {
return r.batchSubscribers.Subscribe(handler)
}

func (r *Repository) FetchBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) {
Expand Down Expand Up @@ -180,7 +182,7 @@ func (r *Repository) AddBatch(batch *common.ExtBatch) error {
if batch.Header.SequencerOrderNo.Cmp(r.latestBatchSeqNo) > 0 {
r.latestBatchSeqNo = batch.Header.SequencerOrderNo
// notify subscribers, a new batch has been successfully added to the db
for _, subscriber := range r.subscribers {
for _, subscriber := range r.batchSubscribers.Subscribers() {
go subscriber.HandleBatch(batch)
}
}
Expand Down
Loading