Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tudor/reconnect new heads #1910

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 64 additions & 32 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"sync"
"sync/atomic"
"time"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -20,7 +21,7 @@ import (
// also handles unsubscribe
// Note: this is a service which must be Started and Stopped
type NewHeadsService struct {
inputCh chan *common.BatchHeader
connectFunc func() (chan *common.BatchHeader, <-chan error, error)
convertToEthHeader bool
notifiersMutex *sync.RWMutex
newHeadNotifiers map[rpc.ID]*rpc.Notifier
Expand All @@ -29,9 +30,10 @@ type NewHeadsService struct {
logger gethlog.Logger
}

func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
// connect - function that returns the input channel
func NewNewHeadsService(connect func() (chan *common.BatchHeader, <-chan error, error), convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
inputCh: inputCh,
connectFunc: connect,
convertToEthHeader: convertToEthHeader,
onMessage: onMessage,
logger: logger,
Expand All @@ -42,38 +44,68 @@ func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader boo
}

func (nhs *NewHeadsService) Start() error {
go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error {
if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
}
}
nhs.reconnect()
return nil
}

func (nhs *NewHeadsService) reconnect() {
// reconnect to the backend and restart the listening
newCh, errCh, err := nhs.connectFunc()
if err != nil {
nhs.logger.Crit("could not connect to new heads: ", log.ErrKey, err)
}
nhs._subscribe(newCh, errCh)
}

func (nhs *NewHeadsService) _subscribe(inputCh chan *common.BatchHeader, errChan <-chan error) {
backedUnsub := &atomic.Bool{}
go HandleUnsubscribeErrChan([]<-chan error{errChan}, func() {
backedUnsub.Store(true)
})
go ForwardFromChannels(
[]chan *common.BatchHeader{inputCh},
func(head *common.BatchHeader) error {
return nhs.onNewBatch(head)
},
func() {
nhs.logger.Info("Disconnected from new head subscription. Reconnecting...")
nhs.reconnect()
},
backedUnsub,
nhs.stopped,
2*time.Minute, // todo - create constant
nhs.logger,
)
}

var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
func (nhs *NewHeadsService) onNewBatch(head *common.BatchHeader) error {
if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
}
}

var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
}

nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
}
err := notifier.Notify(id, msg)
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
nhs.notifiersMutex.Lock()
delete(nhs.newHeadNotifiers, id)
nhs.notifiersMutex.Unlock()
}
nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
}
return nil
})
err := notifier.Notify(id, msg)
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
delete(nhs.newHeadNotifiers, id)
}
}
return nil
}

Expand All @@ -82,7 +114,7 @@ func (nhs *NewHeadsService) RegisterNotifier(notifier *rpc.Notifier, subscriptio
defer nhs.notifiersMutex.Unlock()
nhs.newHeadNotifiers[subscription.ID] = notifier

go HandleUnsubscribe(subscription, nil, func() {
go HandleUnsubscribe(subscription, func() {
nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()
delete(nhs.newHeadNotifiers, subscription.ID)
Expand Down
64 changes: 45 additions & 19 deletions go/common/subscription/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (
"sync/atomic"
"time"

gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common/log"

"github.com/ten-protocol/go-ten/lib/gethfork/rpc"
)

// ForwardFromChannels - reads messages from the input channels, and calls the `onMessage` callback.
// Exits when the unsubscribed flag is true.
// ForwardFromChannels - reads messages from all input channels, and calls the `onMessage` callback.
// Exits when the "stopped" flag is true or when the connection times out.
// Must be called as a go routine!
func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error) {
func ForwardFromChannels[R any](inputChannels []chan R, onMessage func(R) error, onBackendDisconnect func(), backendDisconnected *atomic.Bool, stopped *atomic.Bool, timeoutInterval time.Duration, logger gethlog.Logger) {
inputCases := make([]reflect.SelectCase, len(inputChannels)+1)

// create a ticker to handle cleanup, check the "unsubscribed" flag and exit the goroutine
// create a ticker to handle cleanup, check the "stopped" flag and exit the goroutine
inputCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(time.NewTicker(2 * time.Second).C),
Expand All @@ -25,42 +28,65 @@ func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Boo
inputCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}

unclosedInputChannels := len(inputCases)
for unclosedInputChannels > 0 {
chosen, value, ok := reflect.Select(inputCases)
lastMessageTime := time.Now()
loop:
for {
// this mechanism removes closed input channels. When there is none left, the subscription is considered "disconnected".
_, value, ok := reflect.Select(inputCases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
inputCases[chosen].Chan = reflect.ValueOf(nil)
unclosedInputChannels--
continue
logger.Error("Failed to read from the channel")
break loop
}

if unsubscribed != nil && unsubscribed.Load() {
// flag that the service needs to stop
if stopped != nil && stopped.Load() {
return
}

// flag that the backend channels were disconnected
if backendDisconnected != nil && backendDisconnected.Load() {
break loop
}

switch v := value.Interface().(type) {
case time.Time:
// exit the loop to avoid a goroutine leak
if unsubscribed != nil && unsubscribed.Load() {
return
// no message was received longer than the timeout. Exiting.
if time.Since(lastMessageTime) > timeoutInterval {
break loop
}
case R:
lastMessageTime = time.Now()
err := onMessage(v)
if err != nil {
// todo - log
return
logger.Error("Failed to process message", log.ErrKey, err)
break loop
}
default:
// ignore unexpected element
continue
logger.Warn("Received unexpected message type.", "type", reflect.TypeOf(v), "value", value)
break loop
}
}

if onBackendDisconnect != nil {
onBackendDisconnect()
}
}

// HandleUnsubscribe - when the client calls "unsubscribe" or the subscription times out, it calls `onSub`
// Must be called as a go routine!
func HandleUnsubscribe(connectionSub *rpc.Subscription, unsubscribed *atomic.Bool, onUnsub func()) {
func HandleUnsubscribe(connectionSub *rpc.Subscription, onUnsub func()) {
<-connectionSub.Err()
onUnsub()
}

// HandleUnsubscribeErrChan - when the client calls "unsubscribe" or the subscription times out, it calls `onSub`
// Must be called as a go routine!
func HandleUnsubscribeErrChan(errChan []<-chan error, onUnsub func()) {
inputCases := make([]reflect.SelectCase, len(errChan))
for i, ch := range errChan {
inputCases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
reflect.Select(inputCases)
onUnsub()
}
30 changes: 23 additions & 7 deletions go/host/rpc/clientapi/client_api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/ten-protocol/go-ten/go/common/host"
subscriptioncommon "github.com/ten-protocol/go-ten/go/common/subscription"
Expand All @@ -26,9 +27,16 @@ type FilterAPI struct {

func NewFilterAPI(host host.Host, logger gethlog.Logger) *FilterAPI {
return &FilterAPI{
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsService(host.NewHeadsChan(), false, logger, nil),
host: host,
logger: logger,
NewHeadsService: subscriptioncommon.NewNewHeadsService(
func() (chan *common.BatchHeader, <-chan error, error) {
return host.NewHeadsChan(), nil, nil
},
false,
logger,
nil,
),
}
}

Expand Down Expand Up @@ -57,10 +65,18 @@ func (api *FilterAPI) Logs(ctx context.Context, encryptedParams common.Encrypted
}

var unsubscribed atomic.Bool
go subscriptioncommon.ForwardFromChannels([]chan []byte{logsFromSubscription}, &unsubscribed, func(elem []byte) error {
return notifier.Notify(subscription.ID, elem)
})
go subscriptioncommon.HandleUnsubscribe(subscription, &unsubscribed, func() {
go subscriptioncommon.ForwardFromChannels(
[]chan []byte{logsFromSubscription},
func(elem []byte) error {
return notifier.Notify(subscription.ID, elem)
},
nil,
nil,
&unsubscribed,
12*time.Hour,
api.logger,
)
go subscriptioncommon.HandleUnsubscribe(subscription, func() {
api.host.UnsubscribeLogs(subscription.ID)
unsubscribed.Store(true)
})
Expand Down
56 changes: 36 additions & 20 deletions go/rpc/encrypted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"fmt"
"reflect"
"sync/atomic"
"time"

"github.com/ten-protocol/go-ten/go/common/rpc"
"github.com/ten-protocol/go-ten/go/common/subscription"
Expand Down Expand Up @@ -253,31 +255,45 @@ func (c *EncRPCClient) logSubscription(ctx context.Context, namespace string, ch
return nil, err
}

// todo - do we need to handle unsubscribe in a special way?
// probably not, because when the inbound channel is closed, this goroutine will exit as well.
go subscription.ForwardFromChannels([]chan []byte{inboundChannel}, nil, func(encLog []byte) error {
jsonLogs, err := c.decryptResponse(encLog)
if err != nil {
c.logger.Error("could not decrypt logs received from subscription.", log.ErrKey, err)
return err
}

var logs []*types.Log
err = json.Unmarshal(jsonLogs, &logs)
if err != nil {
c.logger.Error(fmt.Sprintf("could not unmarshal log from JSON. Received data: %s.", string(jsonLogs)), log.ErrKey, err)
return err
}

for _, decryptedLog := range logs {
outboundChannel <- *decryptedLog
}
return nil
backendDisconnected := &atomic.Bool{}
go subscription.HandleUnsubscribeErrChan([]<-chan error{backendSub.Err()}, func() {
backendDisconnected.Store(true)
})
go subscription.ForwardFromChannels(
[]chan []byte{inboundChannel},
func(encLog []byte) error {
return c.onMessage(encLog, outboundChannel)
},
nil,
backendDisconnected,
nil,
12*time.Hour,
c.logger,
)

return backendSub, nil
}

func (c *EncRPCClient) onMessage(encLog []byte, outboundChannel chan types.Log) error {
jsonLogs, err := c.decryptResponse(encLog)
if err != nil {
c.logger.Error("could not decrypt logs received from subscription.", log.ErrKey, err)
return err
}

var logs []*types.Log
err = json.Unmarshal(jsonLogs, &logs)
if err != nil {
c.logger.Error(fmt.Sprintf("could not unmarshal log from JSON. Received data: %s.", string(jsonLogs)), log.ErrKey, err)
return err
}

for _, decryptedLog := range logs {
outboundChannel <- *decryptedLog
}
return nil
}

func (c *EncRPCClient) newHeadSubscription(ctx context.Context, namespace string, ch interface{}, args ...any) (*gethrpc.ClientSubscription, error) {
return nil, fmt.Errorf("not implemented")
}
Expand Down
Loading
Loading