Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tudor/reconnect new heads #1910

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 11 additions & 30 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
// also handles unsubscribe
// Note: this is a service which must be Started and Stopped
type NewHeadsService struct {
inputCh chan *common.BatchHeader
connectFunc func(chan *common.BatchHeader) error
connectFunc func() (chan *common.BatchHeader, error)
convertToEthHeader bool
notifiersMutex *sync.RWMutex
newHeadNotifiers map[rpc.ID]*rpc.Notifier
Expand All @@ -32,7 +31,8 @@ type NewHeadsService struct {
logger gethlog.Logger
}

func NewNewHeadsServiceWithConnect(connect func(chan *common.BatchHeader) error, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
// connect - function that returns the input channel
func NewNewHeadsService(connect func() (chan *common.BatchHeader, error), convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
connectFunc: connect,
convertToEthHeader: convertToEthHeader,
Expand All @@ -44,29 +44,14 @@ func NewNewHeadsServiceWithConnect(connect func(chan *common.BatchHeader) error,
}
}

func NewNewHeadsServiceWithChannel(ch chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
inputCh: ch,
convertToEthHeader: convertToEthHeader,
onMessage: onMessage,
logger: logger,
stopped: &atomic.Bool{},
newHeadNotifiers: make(map[rpc.ID]*rpc.Notifier),
notifiersMutex: &sync.RWMutex{},
}
}

func (nhs *NewHeadsService) Start() error {
if nhs.inputCh == nil {
nhs.inputCh = make(chan *common.BatchHeader)
err := nhs.connectFunc(nhs.inputCh)
if err != nil {
return fmt.Errorf("could not connect to new heads: %w", err)
}
inputCh, err := nhs.connectFunc()
if err != nil {
return fmt.Errorf("could not connect to new heads: %w", err)
}

go ForwardFromChannels(
[]chan *common.BatchHeader{nhs.inputCh},
[]chan *common.BatchHeader{inputCh},
nhs.stopped,
func(head *common.BatchHeader) error {
if nhs.onMessage != nil {
Expand All @@ -81,8 +66,8 @@ func (nhs *NewHeadsService) Start() error {
msg = convertBatchHeader(head)
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()
nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
Expand All @@ -93,18 +78,14 @@ func (nhs *NewHeadsService) Start() error {
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
nhs.notifiersMutex.Lock()
delete(nhs.newHeadNotifiers, id)
nhs.notifiersMutex.Unlock()
}
}
return nil
},
func() {
if nhs.connectFunc == nil {
nhs.logger.Crit("the inbound new heads channel was closed.")
}
err := nhs.connectFunc(nhs.inputCh)
nhs.logger.Info("Disconnected from new head subscription. Reconnecting...")
inputCh, err = nhs.connectFunc()
Copy link
Collaborator

@BedrockSquirrel BedrockSquirrel May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment doesn't have the desired effect I don't think. It won't update the channel in the slice that was passed into ForwardFromChannels above.

To do that you'd need to reference the slice, something like:

inputChans := []chan *common.BatchHeader{inputCh}

// and then down here you can write:			
inputCh, err = nhs.connectFunc()
inputChans[0] = inputCh

I wasn't sure how this worked, so did this to convince myself lol: https://go.dev/play/p/YD7x0HaUtoO

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually even that fix might be problematic I think (if I'm reading the magical incantations correctly).

The inputCases you build up with reflection at the start of ForwardFromChannels() will still be listening on the old channel.

if err != nil {
nhs.logger.Crit("could not connect to new heads: ", err)
}
Expand Down
1 change: 1 addition & 0 deletions go/common/subscription/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
lastMessageTime := time.Now()
unclosedInputChannels := len(inputCases)
for unclosedInputChannels > 0 {
// this mechanism removes closed input channels. When there is none left, the subscription is considered "disconnected".
chosen, value, ok := reflect.Select(inputCases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/storage/init/edgelessdb/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ create table if not exists obsdb.batch
is_executed boolean NOT NULL,
primary key (sequence),
INDEX USING HASH (hash(8)),
INDEX USING HASH (l1_proof_hash(8)),
INDEX (body, l1_proof),
INDEX (body),
INDEX (l1_proof),
INDEX (height)
);
GRANT ALL ON obsdb.batch TO obscuro;
Expand Down
3 changes: 2 additions & 1 deletion go/enclave/storage/init/sqlite/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ create table if not exists batch
);
create index IDX_BATCH_HASH on batch (hash);
create index IDX_BATCH_BLOCK on batch (l1_proof_hash);
create index IDX_BATCH_BODY on batch (body, l1_proof);
create index IDX_BATCH_BODY on batch (body);
create index IDX_BATCH_L1 on batch (l1_proof);
create index IDX_BATCH_HEIGHT on batch (height);

create table if not exists tx
Expand Down
13 changes: 10 additions & 3 deletions go/host/rpc/clientapi/client_api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,16 @@ type FilterAPI struct {

func NewFilterAPI(host host.Host, logger gethlog.Logger) *FilterAPI {
return &FilterAPI{
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsServiceWithChannel(host.NewHeadsChan(), false, logger, nil),
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsService(
func() (chan *common.BatchHeader, error) {
return host.NewHeadsChan(), nil
},
false,
logger,
nil,
),
}
}

Expand Down
10 changes: 7 additions & 3 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
Config: config,
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsServiceWithConnect(
func(ch chan *tencommon.BatchHeader) error {
services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
func() (chan *tencommon.BatchHeader, error) {
logger.Info("Connecting to new heads service...")
// clear the cache to avoid returning stale data during reconnecting.
services.Cache.EvictShortLiving()
return subscribeToNewHeadsWithReconnect(ch, services, logger)
ch := make(chan *tencommon.BatchHeader)
err := subscribeToNewHeadsWithReconnect(ch, services, logger)
logger.Info("Connected to new heads service.", log.ErrKey, err)
return ch, err
},
true,
logger,
Expand Down
Loading