Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tudor/reconnect new heads #1910

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 46 additions & 28 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package subscription

import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -20,7 +22,7 @@ import (
// also handles unsubscribe
// Note: this is a service which must be Started and Stopped
type NewHeadsService struct {
inputCh chan *common.BatchHeader
connectFunc func() (chan *common.BatchHeader, error)
convertToEthHeader bool
notifiersMutex *sync.RWMutex
newHeadNotifiers map[rpc.ID]*rpc.Notifier
Expand All @@ -29,9 +31,10 @@ type NewHeadsService struct {
logger gethlog.Logger
}

func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
// connect - function that returns the input channel
func NewNewHeadsService(connect func() (chan *common.BatchHeader, error), convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
inputCh: inputCh,
connectFunc: connect,
convertToEthHeader: convertToEthHeader,
onMessage: onMessage,
logger: logger,
Expand All @@ -42,38 +45,53 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo
}

func (nhs *NewHeadsService) Start() error {
go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error {
if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
inputCh, err := nhs.connectFunc()
if err != nil {
return fmt.Errorf("could not connect to new heads: %w", err)
}

go ForwardFromChannels(
[]chan *common.BatchHeader{inputCh},
nhs.stopped,
func(head *common.BatchHeader) error {
if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
}
}
}

var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
}
var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()
nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
}
err := notifier.Notify(id, msg)
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
delete(nhs.newHeadNotifiers, id)
}
}
err := notifier.Notify(id, msg)
return nil
},
func() {
nhs.logger.Info("Disconnected from new head subscription. Reconnecting...")
inputCh, err = nhs.connectFunc()
Copy link
Collaborator

@BedrockSquirrel BedrockSquirrel May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment doesn't have the desired effect I don't think. It won't update the channel in the slice that was passed into ForwardFromChannels above.

To do that you'd need to reference the slice, something like:

inputChans := []chan *common.BatchHeader{inputCh}

// and then down here you can write:			
inputCh, err = nhs.connectFunc()
inputChans[0] = inputCh

I wasn't sure how this worked, so did this to convince myself lol: https://go.dev/play/p/YD7x0HaUtoO

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually even that fix might be problematic I think (if I'm reading the magical incantations correctly).

The inputCases you build up with reflection at the start of ForwardFromChannels() will still be listening on the old channel.

if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
nhs.notifiersMutex.Lock()
delete(nhs.newHeadNotifiers, id)
nhs.notifiersMutex.Unlock()
nhs.logger.Crit("could not connect to new heads: ", err)
}
}
return nil
})
},
2*time.Minute, // todo - create constant
)
return nil
}

Expand Down
14 changes: 12 additions & 2 deletions go/common/subscription/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// ForwardFromChannels - reads messages from the input channels, and calls the `onMessage` callback.
// Exits when the unsubscribed flag is true.
// Must be called as a go routine!
func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error) {
func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error, onBackendDisconnect func(), timeoutInterval time.Duration) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe call the function onBackendDisconnect -> onTimeout to be clearer, I think that's the only time it's called.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also called when the backend disconnects via the usual means.

Copy link
Collaborator

@BedrockSquirrel BedrockSquirrel May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usual meaning when unsubscribed is set to true? Think you need to change a return to a break then in the for loop unless I'm misreading, couldn't see any other way to reach the call.

Copy link
Collaborator Author

@tudor-malene tudor-malene May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bit here:

		chosen, value, ok := reflect.Select(inputCases)
		if !ok {
			// The chosen channel has been closed, so zero out the channel to disable the case
			inputCases[chosen].Chan = reflect.ValueOf(nil)
			unclosedInputChannels--
			continue
		}

I'll add a comment to clarify it.
It's magic, lol

inputCases := make([]reflect.SelectCase, len(inputChannels)+1)

// create a ticker to handle cleanup, check the "unsubscribed" flag and exit the goroutine
Expand All @@ -25,8 +25,10 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
inputCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}

lastMessageTime := time.Now()
unclosedInputChannels := len(inputCases)
for unclosedInputChannels > 0 {
// this mechanism removes closed input channels. When there is none left, the subscription is considered "disconnected".
chosen, value, ok := reflect.Select(inputCases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
Expand All @@ -45,7 +47,12 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
if unsubscribed != nil && unsubscribed.Load() {
return
}
// no message was received longer than the timeout. Exiting.
if time.Since(lastMessageTime) > timeoutInterval {
break
}
case R:
lastMessageTime = time.Now()
err := onMessage(v)
if err != nil {
// todo - log
Expand All @@ -56,11 +63,14 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
continue
}
}
if onBackendDisconnect != nil {
onBackendDisconnect()
}
}

// HandleUnsubscribe - when the client calls "unsubscribe" or the subscription times out, it calls `onSub`
// Must be called as a go routine!
func HandleUnsubscribe(connectionSub *rpc.Subscription, unsubscribed *atomic.Bool, onUnsub func()) {
func HandleUnsubscribe(connectionSub *rpc.Subscription, _ *atomic.Bool, onUnsub func()) {
<-connectionSub.Err()
onUnsub()
}
4 changes: 2 additions & 2 deletions go/enclave/storage/init/edgelessdb/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ create table if not exists obsdb.batch
is_executed boolean NOT NULL,
primary key (sequence),
INDEX USING HASH (hash(8)),
INDEX USING HASH (l1_proof_hash(8)),
INDEX (body, l1_proof),
INDEX (body),
INDEX (l1_proof),
INDEX (height)
);
GRANT ALL ON obsdb.batch TO obscuro;
Expand Down
3 changes: 2 additions & 1 deletion go/enclave/storage/init/sqlite/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ create table if not exists batch
);
create index IDX_BATCH_HASH on batch (hash);
create index IDX_BATCH_BLOCK on batch (l1_proof_hash);
create index IDX_BATCH_BODY on batch (body, l1_proof);
create index IDX_BATCH_BODY on batch (body);
create index IDX_BATCH_L1 on batch (l1_proof);
create index IDX_BATCH_HEIGHT on batch (height);

create table if not exists tx
Expand Down
16 changes: 12 additions & 4 deletions go/host/rpc/clientapi/client_api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/ten-protocol/go-ten/go/common/host"
subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"
Expand All @@ -26,9 +27,16 @@ type FilterAPI struct {

func NewFilterAPI(host host.Host, logger gethlog.Logger) *FilterAPI {
return &FilterAPI{
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsService(host.NewHeadsChan(), false, logger, nil),
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsService(
func() (chan *common.BatchHeader, error) {
return host.NewHeadsChan(), nil
},
false,
logger,
nil,
),
}
}

Expand Down Expand Up @@ -59,7 +67,7 @@ func (api *FilterAPI) Logs(ctx context.Context, encryptedParams common.Encrypted
var unsubscribed atomic.Bool
go subscriptioncommon.ForwardFromChannels([]chan []byte{logsFromSubscription}, &unsubscribed, func(elem []byte) error {
return notifier.Notify(subscription.ID, elem)
})
}, nil, 12*time.Hour)
go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() {
api.host.UnsubscribeLogs(subscription.ID)
unsubscribed.Store(true)
Expand Down
6 changes: 5 additions & 1 deletion go/rpc/encrypted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/ten-protocol/go-ten/go/common/rpc"
"github.com/ten-protocol/go-ten/go/common/subscription"
Expand Down Expand Up @@ -273,7 +274,10 @@ func (c *EncRPCClient) logSubscription(ctx context.Context, namespace string, ch
outboundChannel <- *decryptedLog
}
return nil
})
},
nil,
12*time.Hour,
)

return backendSub, nil
}
Expand Down
31 changes: 19 additions & 12 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"

Expand Down Expand Up @@ -93,19 +94,25 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
subscription := subNotifier.CreateSubscription()

unsubscribed := atomic.Bool{}
go subscriptioncommon.ForwardFromChannels(inputChannels, &unsubscribed, func(log types.Log) error {
uniqueLogKey := LogKey{
BlockHash: log.BlockHash,
TxHash: log.TxHash,
Index: log.Index,
}
go subscriptioncommon.ForwardFromChannels(
inputChannels,
&unsubscribed,
func(log types.Log) error {
uniqueLogKey := LogKey{
BlockHash: log.BlockHash,
TxHash: log.TxHash,
Index: log.Index,
}

if !dedupeBuffer.Contains(uniqueLogKey) {
dedupeBuffer.Push(uniqueLogKey)
return subNotifier.Notify(subscription.ID, log)
}
return nil
})
if !dedupeBuffer.Contains(uniqueLogKey) {
dedupeBuffer.Push(uniqueLogKey)
return subNotifier.Notify(subscription.ID, log)
}
return nil
},
nil,
12*time.Hour,
)

go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() {
for _, backendSub := range backendSubscriptions {
Expand Down
71 changes: 38 additions & 33 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ type Services struct {
version string
Cache cache.Cache
// the OG maintains a connection pool of rpc connections to underlying nodes
rpcHTTPConnPool *pool.ObjectPool
rpcWSConnPool *pool.ObjectPool
Config *common.Config
backendNewHeadsSubscription *gethrpc.ClientSubscription
NewHeadsService *subscriptioncommon.NewHeadsService
rpcHTTPConnPool *pool.ObjectPool
rpcWSConnPool *pool.ObjectPool
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
}

type NewHeadNotifier interface {
Expand Down Expand Up @@ -107,42 +106,49 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
Config: config,
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsService(
func() (chan *tencommon.BatchHeader, error) {
logger.Info("Connecting to new heads service...")
// clear the cache to avoid returning stale data during reconnecting.
services.Cache.EvictShortLiving()
ch := make(chan *tencommon.BatchHeader)
err := subscribeToNewHeadsWithReconnect(ch, services, logger)
logger.Info("Connected to new heads service.", log.ErrKey, err)
return ch, err
},
true,
logger,
func(newHead *tencommon.BatchHeader) error {
services.Cache.EvictShortLiving()
return nil
})

return &services
}

func subscribeToNewHeadsWithReconnect(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) error {
connectionObj, err := services.rpcWSConnPool.BorrowObject(context.Background())
if err != nil {
panic(fmt.Errorf("cannot fetch rpc connection to backend node %w", err))
return fmt.Errorf("cannot fetch rpc connection to backend node %w", err)
}

rpcClient := connectionObj.(rpc.Client)
ch := make(chan *tencommon.BatchHeader)
clientSubscription, err := subscribeToNewHeadsWithRetry(rpcClient, ch, retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), logger)
if err != nil {
panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err))
}
services.backendNewHeadsSubscription = clientSubscription
services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error {
services.Cache.EvictShortLiving()
return nil
})

return &services
}

func subscribeToNewHeadsWithRetry(rpcClient rpc.Client, ch chan *tencommon.BatchHeader, retryStrategy retry.Strategy, logger gethlog.Logger) (*gethrpc.ClientSubscription, error) {
var sub *gethrpc.ClientSubscription

err := retry.Do(func() error {
var err error
sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
}
return err
}, retryStrategy)
err = retry.Do(
func() error {
_, err := rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
}
return err
},
retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second),
)
if err != nil {
logger.Error("could not subscribe for new head blocks.", log.ErrKey, err)
return fmt.Errorf("cannot subscribe to new heads to the backend %w", err)
}

return sub, err
return nil
}

// IsStopping returns whether the WE is stopping
Expand Down Expand Up @@ -288,7 +294,6 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic
}

func (w *Services) Stop() {
w.backendNewHeadsSubscription.Unsubscribe()
w.rpcHTTPConnPool.Close(context.Background())
w.rpcWSConnPool.Close(context.Background())
}
Loading