Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tudor/reconnect new heads #1910

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 64 additions & 27 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package subscription

import (
"context"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -21,6 +23,7 @@ import (
// Note: this is a service which must be Started and Stopped
type NewHeadsService struct {
inputCh chan *common.BatchHeader
connectFunc func(chan *common.BatchHeader) error
convertToEthHeader bool
notifiersMutex *sync.RWMutex
newHeadNotifiers map[rpc.ID]*rpc.Notifier
Expand All @@ -29,9 +32,21 @@ type NewHeadsService struct {
logger gethlog.Logger
}

func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
func NewNewHeadsServiceWithConnect(connect func(chan *common.BatchHeader) error, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
inputCh: inputCh,
connectFunc: connect,
convertToEthHeader: convertToEthHeader,
onMessage: onMessage,
logger: logger,
stopped: &atomic.Bool{},
newHeadNotifiers: make(map[rpc.ID]*rpc.Notifier),
notifiersMutex: &sync.RWMutex{},
}
}

func NewNewHeadsServiceWithChannel(ch chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
inputCh: ch,
convertToEthHeader: convertToEthHeader,
onMessage: onMessage,
logger: logger,
Expand All @@ -42,38 +57,60 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo
}

func (nhs *NewHeadsService) Start() error {
go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error {
if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
}
if nhs.inputCh == nil {
nhs.inputCh = make(chan *common.BatchHeader)
err := nhs.connectFunc(nhs.inputCh)
if err != nil {
return fmt.Errorf("could not connect to new heads: %w", err)
}
}

var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
}
go ForwardFromChannels(
[]chan *common.BatchHeader{nhs.inputCh},
nhs.stopped,
func(head *common.BatchHeader) error {
if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
}
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()
var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
}
err := notifier.Notify(id, msg)
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
nhs.notifiersMutex.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would deadlock I think, you can't take the write-lock while it's under read-lock. Need to copy the notifiers list before iterating or make list of ones to delete after maybe.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't changed this bit. Just reformatted. It might have been there before. I'll have a look

delete(nhs.newHeadNotifiers, id)
nhs.notifiersMutex.Unlock()
}
}
return nil
},
func() {
if nhs.connectFunc == nil {
nhs.logger.Crit("the inbound new heads channel was closed.")
}
err := notifier.Notify(id, msg)
err := nhs.connectFunc(nhs.inputCh)
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
nhs.notifiersMutex.Lock()
delete(nhs.newHeadNotifiers, id)
nhs.notifiersMutex.Unlock()
nhs.logger.Crit("could not connect to new heads: ", err)
}
}
return nil
})
},
2*time.Minute, // todo - create constant
)
return nil
}

Expand Down
13 changes: 11 additions & 2 deletions go/common/subscription/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// ForwardFromChannels - reads messages from the input channels, and calls the `onMessage` callback.
// Exits when the unsubscribed flag is true.
// Must be called as a go routine!
func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error) {
func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error, onBackendDisconnect func(), timeoutInterval time.Duration) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe call the function onBackendDisconnect -> onTimeout to be clearer, I think that's the only time it's called.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also called when the backend disconnects via the usual means.

Copy link
Collaborator

@BedrockSquirrel BedrockSquirrel May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usual meaning when unsubscribed is set to true? Think you need to change a return to a break then in the for loop unless I'm misreading, couldn't see any other way to reach the call.

Copy link
Collaborator Author

@tudor-malene tudor-malene May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bit here:

		chosen, value, ok := reflect.Select(inputCases)
		if !ok {
			// The chosen channel has been closed, so zero out the channel to disable the case
			inputCases[chosen].Chan = reflect.ValueOf(nil)
			unclosedInputChannels--
			continue
		}

I'll add a comment to clarify it.
It's magic, lol

inputCases := make([]reflect.SelectCase, len(inputChannels)+1)

// create a ticker to handle cleanup, check the "unsubscribed" flag and exit the goroutine
Expand All @@ -25,6 +25,7 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
inputCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}

lastMessageTime := time.Now()
unclosedInputChannels := len(inputCases)
for unclosedInputChannels > 0 {
chosen, value, ok := reflect.Select(inputCases)
Expand All @@ -45,7 +46,12 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
if unsubscribed != nil && unsubscribed.Load() {
return
}
// no message was received longer than the timeout. Exiting.
if time.Since(lastMessageTime) > timeoutInterval {
break
}
case R:
lastMessageTime = time.Now()
err := onMessage(v)
if err != nil {
// todo - log
Expand All @@ -56,11 +62,14 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
continue
}
}
if onBackendDisconnect != nil {
onBackendDisconnect()
}
}

// HandleUnsubscribe - when the client calls "unsubscribe" or the subscription times out, it calls `onSub`
// Must be called as a go routine!
func HandleUnsubscribe(connectionSub *rpc.Subscription, unsubscribed *atomic.Bool, onUnsub func()) {
func HandleUnsubscribe(connectionSub *rpc.Subscription, _ *atomic.Bool, onUnsub func()) {
<-connectionSub.Err()
onUnsub()
}
5 changes: 3 additions & 2 deletions go/host/rpc/clientapi/client_api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/ten-protocol/go-ten/go/common/host"
subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"
Expand All @@ -28,7 +29,7 @@ func NewFilterAPI(host host.Host, logger gethlog.Logger) *FilterAPI {
return &FilterAPI{
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsService(host.NewHeadsChan(), false, logger, nil),
NewHeadsService: subscriptioncommon.NewNewHeadsServiceWithChannel(host.NewHeadsChan(), false, logger, nil),
}
}

Expand Down Expand Up @@ -59,7 +60,7 @@ func (api *FilterAPI) Logs(ctx context.Context, encryptedParams common.Encrypted
var unsubscribed atomic.Bool
go subscriptioncommon.ForwardFromChannels([]chan []byte{logsFromSubscription}, &unsubscribed, func(elem []byte) error {
return notifier.Notify(subscription.ID, elem)
})
}, nil, 12*time.Hour)
go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() {
api.host.UnsubscribeLogs(subscription.ID)
unsubscribed.Store(true)
Expand Down
6 changes: 5 additions & 1 deletion go/rpc/encrypted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/ten-protocol/go-ten/go/common/rpc"
"github.com/ten-protocol/go-ten/go/common/subscription"
Expand Down Expand Up @@ -273,7 +274,10 @@ func (c *EncRPCClient) logSubscription(ctx context.Context, namespace string, ch
outboundChannel <- *decryptedLog
}
return nil
})
},
nil,
12*time.Hour,
)

return backendSub, nil
}
Expand Down
31 changes: 19 additions & 12 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"

Expand Down Expand Up @@ -93,19 +94,25 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
subscription := subNotifier.CreateSubscription()

unsubscribed := atomic.Bool{}
go subscriptioncommon.ForwardFromChannels(inputChannels, &unsubscribed, func(log types.Log) error {
uniqueLogKey := LogKey{
BlockHash: log.BlockHash,
TxHash: log.TxHash,
Index: log.Index,
}
go subscriptioncommon.ForwardFromChannels(
inputChannels,
&unsubscribed,
func(log types.Log) error {
uniqueLogKey := LogKey{
BlockHash: log.BlockHash,
TxHash: log.TxHash,
Index: log.Index,
}

if !dedupeBuffer.Contains(uniqueLogKey) {
dedupeBuffer.Push(uniqueLogKey)
return subNotifier.Notify(subscription.ID, log)
}
return nil
})
if !dedupeBuffer.Contains(uniqueLogKey) {
dedupeBuffer.Push(uniqueLogKey)
return subNotifier.Notify(subscription.ID, log)
}
return nil
},
nil,
12*time.Hour,
)

go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() {
for _, backendSub := range backendSubscriptions {
Expand Down
67 changes: 34 additions & 33 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ type Services struct {
version string
Cache cache.Cache
// the OG maintains a connection pool of rpc connections to underlying nodes
rpcHTTPConnPool *pool.ObjectPool
rpcWSConnPool *pool.ObjectPool
Config *common.Config
backendNewHeadsSubscription *gethrpc.ClientSubscription
NewHeadsService *subscriptioncommon.NewHeadsService
rpcHTTPConnPool *pool.ObjectPool
rpcWSConnPool *pool.ObjectPool
Config *common.Config
NewHeadsService *subscriptioncommon.NewHeadsService
}

type NewHeadNotifier interface {
Expand Down Expand Up @@ -107,42 +106,45 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
Config: config,
}

services.NewHeadsService = subscriptioncommon.NewNewHeadsServiceWithConnect(
func(ch chan *tencommon.BatchHeader) error {
// clear the cache to avoid returning stale data during reconnecting.
services.Cache.EvictShortLiving()
return subscribeToNewHeadsWithReconnect(ch, services, logger)
},
true,
logger,
func(newHead *tencommon.BatchHeader) error {
services.Cache.EvictShortLiving()
return nil
})

return &services
}

func subscribeToNewHeadsWithReconnect(ch chan *tencommon.BatchHeader, services Services, logger gethlog.Logger) error {
connectionObj, err := services.rpcWSConnPool.BorrowObject(context.Background())
if err != nil {
panic(fmt.Errorf("cannot fetch rpc connection to backend node %w", err))
return fmt.Errorf("cannot fetch rpc connection to backend node %w", err)
}

rpcClient := connectionObj.(rpc.Client)
ch := make(chan *tencommon.BatchHeader)
clientSubscription, err := subscribeToNewHeadsWithRetry(rpcClient, ch, retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second), logger)
if err != nil {
panic(fmt.Errorf("cannot subscribe to new heads to the backend %w", err))
}
services.backendNewHeadsSubscription = clientSubscription
services.NewHeadsService = subscriptioncommon.NewNewHeadsService(ch, true, logger, func(newHead *tencommon.BatchHeader) error {
services.Cache.EvictShortLiving()
return nil
})

return &services
}

func subscribeToNewHeadsWithRetry(rpcClient rpc.Client, ch chan *tencommon.BatchHeader, retryStrategy retry.Strategy, logger gethlog.Logger) (*gethrpc.ClientSubscription, error) {
var sub *gethrpc.ClientSubscription

err := retry.Do(func() error {
var err error
sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
}
return err
}, retryStrategy)
err = retry.Do(
func() error {
_, err := rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
}
return err
},
retry.NewTimeoutStrategy(10*time.Minute, 1*time.Second),
)
if err != nil {
logger.Error("could not subscribe for new head blocks.", log.ErrKey, err)
return fmt.Errorf("cannot subscribe to new heads to the backend %w", err)
}

return sub, err
return nil
}

// IsStopping returns whether the WE is stopping
Expand Down Expand Up @@ -288,7 +290,6 @@ func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlic
}

func (w *Services) Stop() {
w.backendNewHeadsSubscription.Unsubscribe()
w.rpcHTTPConnPool.Close(context.Background())
w.rpcWSConnPool.Close(context.Background())
}
Loading