Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New heads subscription #1861

Merged
merged 5 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions go/common/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type Host interface {
Start() error
// SubmitAndBroadcastTx submits an encrypted transaction to the enclave, and broadcasts it to the other hosts on the network.
SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSendRawTx) (*responses.RawTx, error)
// Subscribe feeds logs matching the encrypted log subscription to the matchedLogs channel.
Subscribe(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogs chan []byte) error
// Unsubscribe terminates a log subscription between the host and the enclave.
Unsubscribe(id rpc.ID)
// SubscribeLogs feeds logs matching the encrypted log subscription to the matchedLogs channel.
SubscribeLogs(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogs chan []byte) error
// UnsubscribeLogs terminates a log subscription between the host and the enclave.
UnsubscribeLogs(id rpc.ID)
// Stop gracefully stops the host execution.
Stop() error

Expand All @@ -31,6 +31,10 @@ type Host interface {

// ObscuroConfig returns the info of the Obscuro network
ObscuroConfig() (*common.ObscuroNetworkInfo, error)

// NewHeadsChan returns live batch headers
// Note - do not use directly. This is meant only for the NewHeadsManager, which multiplexes the headers
NewHeadsChan() chan *common.BatchHeader
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a comment on this to say it should only be called once.

}

type BlockStream struct {
Expand Down
1 change: 1 addition & 0 deletions go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
L2BatchRepositoryName = "l2-batch-repo"
EnclaveServiceName = "enclaves"
LogSubscriptionServiceName = "log-subs"
FilterAPIServiceName = "filter-api"
)

// The host has a number of services that encapsulate the various responsibilities of the host.
Expand Down
116 changes: 116 additions & 0 deletions go/common/subscription/new_heads_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package subscription

import (
"math/big"
"sync"
"sync/atomic"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"

gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/host"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/lib/gethfork/rpc"
)

// NewHeadsService multiplexes new batch header messages from an input channel into multiple subscribers
// also handles unsubscribe
// Note: this is a service which must be Started and Stopped
type NewHeadsService struct {
inputCh chan *common.BatchHeader
convertToEthHeader bool
notifiersMutex *sync.RWMutex
newHeadNotifiers map[rpc.ID]*rpc.Notifier
onMessage func(*common.BatchHeader) error
stopped *atomic.Bool
logger gethlog.Logger
}

func NewNewHeadsService(inputCh chan *common.BatchHeader, convertToEthHeader bool, logger gethlog.Logger, onMessage func(*common.BatchHeader) error) *NewHeadsService {
return &NewHeadsService{
inputCh: inputCh,
convertToEthHeader: convertToEthHeader,
onMessage: onMessage,
logger: logger,
stopped: &atomic.Bool{},
newHeadNotifiers: make(map[rpc.ID]*rpc.Notifier),
notifiersMutex: &sync.RWMutex{},
}
}

func (nhs *NewHeadsService) Start() error {
go ForwardFromChannels([]chan *common.BatchHeader{nhs.inputCh}, nhs.stopped, func(head *common.BatchHeader) error {
nhs.notifiersMutex.RLock()
defer nhs.notifiersMutex.RUnlock()

if nhs.onMessage != nil {
err := nhs.onMessage(head)
if err != nil {
nhs.logger.Info("failed invoking onMessage callback.", log.ErrKey, err)
}
}

var msg any = head
if nhs.convertToEthHeader {
msg = convertBatchHeader(head)
}

// for each new head, notify all registered subscriptions
for id, notifier := range nhs.newHeadNotifiers {
if nhs.stopped.Load() {
return nil
}
err := notifier.Notify(id, msg)
if err != nil {
// on error, remove the notification
nhs.logger.Info("failed to notify newHead subscription", log.ErrKey, err, log.SubIDKey, id)
nhs.notifiersMutex.Lock()
delete(nhs.newHeadNotifiers, id)
nhs.notifiersMutex.Unlock()
}
}
return nil
})
return nil
}

func (nhs *NewHeadsService) RegisterNotifier(notifier *rpc.Notifier, subscription *rpc.Subscription) {
nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()
nhs.newHeadNotifiers[subscription.ID] = notifier

go HandleUnsubscribe(subscription, nil, func() {
nhs.notifiersMutex.Lock()
defer nhs.notifiersMutex.Unlock()
delete(nhs.newHeadNotifiers, subscription.ID)
})
}

func (nhs *NewHeadsService) Stop() error {
nhs.stopped.Store(true)
return nil
}

func (nhs *NewHeadsService) HealthStatus() host.HealthStatus {
return &host.BasicErrHealthStatus{}
}

func convertBatchHeader(head *common.BatchHeader) *types.Header {
return &types.Header{
ParentHash: head.ParentHash,
UncleHash: gethcommon.Hash{},
Root: head.Root,
TxHash: head.TxHash,
ReceiptHash: head.ReceiptHash,
Bloom: types.Bloom{},
Difficulty: big.NewInt(0),
Number: head.Number,
GasLimit: head.GasLimit,
GasUsed: head.GasUsed,
Time: head.Time,
Extra: make([]byte, 0),
BaseFee: head.BaseFee,
}
}
69 changes: 69 additions & 0 deletions go/common/subscription/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package subscription

import (
"reflect"
"sync/atomic"
"time"

"github.com/ten-protocol/go-ten/lib/gethfork/rpc"
)

// ForwardFromChannels - reads messages from the input channels, and calls the `onMessage` callback.
// Exits when the unsubscribed flag is true.
// Must be called as a go routine!
func ForwardFromChannels[R any](inputChannels []chan R, unsubscribed *atomic.Bool, onMessage func(R) error) {
inputCases := make([]reflect.SelectCase, len(inputChannels)+1)

// create a ticker to handle cleanup, check the "unsubscribed" flag and exit the goroutine
inputCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(time.NewTicker(2 * time.Second).C),
}

// create a select "case" for each input channel
for i, ch := range inputChannels {
inputCases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}

unclosedInputChannels := len(inputCases)
for unclosedInputChannels > 0 {
chosen, value, ok := reflect.Select(inputCases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
inputCases[chosen].Chan = reflect.ValueOf(nil)
unclosedInputChannels--
continue
}

if unsubscribed.Load() {
return
}

switch v := value.Interface().(type) {
case time.Time:
// exit the loop to avoid a goroutine leak
if unsubscribed.Load() {
return
}
case R:
err := onMessage(v)
if err != nil {
// todo - log
return
}
default:
// ignore unexpected element
continue
}
}
}

// HandleUnsubscribe - when the client calls "unsubscribe" or the subscription times out, it calls `onSub`
// Must be called as a go routine!
func HandleUnsubscribe(connectionSub *rpc.Subscription, unsubscribed *atomic.Bool, onUnsub func()) {
<-connectionSub.Err()
if unsubscribed != nil {
unsubscribed.Store(true)
}
onUnsub()
}
5 changes: 3 additions & 2 deletions go/host/container/host_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p
}

if cfg.HasClientRPCHTTP || cfg.HasClientRPCWebsockets {
filterAPI := clientapi.NewFilterAPI(h, logger)
rpcServer.RegisterAPIs([]rpc.API{
{
Namespace: APINamespaceObscuro,
Expand All @@ -192,7 +193,7 @@ func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p
},
{
Namespace: APINamespaceEth,
Service: clientapi.NewFilterAPI(h, logger),
Service: filterAPI,
},
{
Namespace: APINamespaceScan,
Expand All @@ -208,7 +209,7 @@ func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p
},
})
}
services.RegisterService(hostcommon.FilterAPIServiceName, filterAPI.NewHeadsService)
}

return hostContainer
}
19 changes: 17 additions & 2 deletions go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ type host struct {

// l2MessageBusAddress is fetched from the enclave but cache it here because it never changes
l2MessageBusAddress *gethcommon.Address
newHeads chan *common.BatchHeader
}

type batchListener struct {
newHeads chan *common.BatchHeader
}

func (bl batchListener) HandleBatch(batch *common.ExtBatch) {
bl.newHeads <- batch.Header
}

func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p hostcommon.P2PHostService, ethClient ethadapter.EthClient, l1Repo hostcommon.L1RepoService, enclaveClients []common.Enclave, ethWallet wallet.Wallet, mgmtContractLib mgmtcontractlib.MgmtContractLib, logger gethlog.Logger, regMetrics gethmetrics.Registry) hostcommon.Host {
Expand All @@ -70,6 +79,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
metricRegistry: regMetrics,

stopControl: stopcontrol.New(),
newHeads: make(chan *common.BatchHeader),
}

enclGuardians := make([]*enclave.Guardian, 0, len(enclaveClients))
Expand All @@ -89,6 +99,7 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
l2Repo := l2.NewBatchRepository(config, hostServices, database, logger)
subsService := events.NewLogEventManager(hostServices, logger)

l2Repo.Subscribe(batchListener{newHeads: host.newHeads})
hostServices.RegisterService(hostcommon.P2PName, p2p)
hostServices.RegisterService(hostcommon.L1BlockRepositoryName, l1Repo)
maxWaitForL1Receipt := 6 * config.L1BlockTime // wait ~10 blocks to see if tx gets published before retrying
Expand Down Expand Up @@ -158,14 +169,14 @@ func (h *host) SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSendRa
return h.services.Enclaves().SubmitAndBroadcastTx(encryptedParams)
}

func (h *host) Subscribe(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error {
func (h *host) SubscribeLogs(id rpc.ID, encryptedLogSubscription common.EncryptedParamsLogSubscription, matchedLogsCh chan []byte) error {
if h.stopControl.IsStopping() {
return responses.ToInternalError(fmt.Errorf("requested Subscribe with the host stopping"))
}
return h.services.LogSubs().Subscribe(id, encryptedLogSubscription, matchedLogsCh)
}

func (h *host) Unsubscribe(id rpc.ID) {
func (h *host) UnsubscribeLogs(id rpc.ID) {
if h.stopControl.IsStopping() {
h.logger.Debug("requested Subscribe with the host stopping")
}
Expand Down Expand Up @@ -235,6 +246,10 @@ func (h *host) ObscuroConfig() (*common.ObscuroNetworkInfo, error) {
}, nil
}

func (h *host) NewHeadsChan() chan *common.BatchHeader {
return h.newHeads
}

// Checks the host config is valid.
func (h *host) validateConfig() {
if h.config.IsGenesis && h.config.NodeType != common.Sequencer {
Expand Down
Loading
Loading