Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve rpc conn pool #1916

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
errorChannels := make([]<-chan error, 0)
backendSubscriptions := make([]*rpc.ClientSubscription, 0)
for _, address := range candidateAddresses {
rpcWSClient, err := connectWS(user.accounts[*address], api.we.Logger())
rpcWSClient, err := connectWS(ctx, user.accounts[*address], api.we.Logger())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,7 +116,10 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
}
return nil
},
nil, // todo - we can implement reconnect logic here
func() {
// release resources
api.closeConnections(backendSubscriptions, backendWSConnections)
}, // todo - we can implement reconnect logic here
&unsubscribedByBackend,
&unsubscribedByClient,
12*time.Hour,
Expand All @@ -126,7 +129,6 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
// handles any of the backend connections being closed
go subscriptioncommon.HandleUnsubscribeErrChan(errorChannels, func() {
unsubscribedByBackend.Store(true)
api.closeConnections(backendSubscriptions, backendWSConnections)
})

// handles "unsubscribe" from the user
Expand All @@ -143,7 +145,7 @@ func (api *FilterAPI) closeConnections(backendSubscriptions []*rpc.ClientSubscri
backendSub.Unsubscribe()
}
for _, connection := range backendWSConnections {
_ = returnConn(api.we.rpcWSConnPool, connection.BackingClient())
_ = returnConn(api.we.rpcWSConnPool, connection.BackingClient(), api.logger)
}
}

Expand Down
39 changes: 25 additions & 14 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/common/measure"
"github.com/ten-protocol/go-ten/go/enclave/core"

"github.com/ten-protocol/go-ten/go/common/log"

gethlog "github.com/ethereum/go-ethereum/log"
pool "github.com/jolestar/go-commons-pool/v2"
tenrpc "github.com/ten-protocol/go-ten/go/rpc"
Expand Down Expand Up @@ -72,7 +77,7 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac
cacheArgs = append(cacheArgs, args...)

res, err := withCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) {
return withPlainRPCConnection(w, func(client *rpc.Client) (*R, error) {
return withPlainRPCConnection(ctx, w, func(client *rpc.Client) (*R, error) {
var resp *R
var err error

Expand Down Expand Up @@ -117,7 +122,7 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s
var rpcErr error
for i := range candidateAccts {
acct := candidateAccts[i]
result, err := withEncRPCConnection(w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) {
result, err := withEncRPCConnection(ctx, w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) {
var result *R
adjustedArgs := args
if cfg.adjustArgs != nil {
Expand Down Expand Up @@ -272,42 +277,48 @@ func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
return LatestBatch
}

func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
return conn(account.user.services.rpcWSConnPool, account, logger)
func connectWS(ctx context.Context, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
return conn(ctx, account.user.services.rpcWSConnPool, account, logger)
}

func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
connectionObj, err := p.BorrowObject(context.Background())
func conn(ctx context.Context, p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
defer core.LogMethodDuration(logger, measure.NewStopwatch(), "get rpc connection")
connectionObj, err := p.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err)
}
conn := connectionObj.(*rpc.Client)
encClient, err := wecommon.CreateEncClient(conn, account.address.Bytes(), account.user.userKey, account.signature, account.signatureType, logger)
if err != nil {
_ = returnConn(p, conn, logger)
return nil, fmt.Errorf("error creating new client, %w", err)
}
return encClient, nil
}

func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error {
return p.ReturnObject(context.Background(), conn)
func returnConn(p *pool.ObjectPool, conn tenrpc.Client, logger gethlog.Logger) error {
err := p.ReturnObject(context.Background(), conn)
if err != nil {
logger.Error("Error returning connection to pool", log.ErrKey, err)
}
return err
}

func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) {
rpcClient, err := conn(acct.user.services.rpcHTTPConnPool, acct, w.logger)
func withEncRPCConnection[R any](ctx context.Context, w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) {
rpcClient, err := conn(ctx, acct.user.services.rpcHTTPConnPool, acct, w.logger)
if err != nil {
return nil, fmt.Errorf("could not connect to backed. Cause: %w", err)
}
defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient())
defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient(), w.logger)
return execute(rpcClient)
}

func withPlainRPCConnection[R any](w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) {
connectionObj, err := w.rpcHTTPConnPool.BorrowObject(context.Background())
func withPlainRPCConnection[R any](ctx context.Context, w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) {
connectionObj, err := w.rpcHTTPConnPool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err)
}
rpcClient := connectionObj.(*rpc.Client)
defer returnConn(w.rpcHTTPConnPool, rpcClient)
defer returnConn(w.rpcHTTPConnPool, rpcClient, w.logger)
return execute(rpcClient)
}
5 changes: 3 additions & 2 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
}, nil, nil, nil)

cfg := pool.NewDefaultPoolConfig()
cfg.MaxTotal = 100 // todo - what is the right number
cfg.MaxTotal = 200 // todo - what is the right number

services := Services{
HostAddrHTTP: hostAddrHTTP,
Expand Down Expand Up @@ -138,6 +138,7 @@ func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Servi
sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
_ = returnConn(services.rpcWSConnPool, rpcClient, logger)
}
return err
},
Expand Down Expand Up @@ -270,7 +271,7 @@ func (w *Services) Version() string {
}

func (w *Services) GetTenNodeHealthStatus() (bool, error) {
res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) {
res, err := withPlainRPCConnection[bool](context.Background(), w, func(client *gethrpc.Client) (*bool, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth putting a timeout on this context too in case the conn pool is gummed up?

res, err := obsclient.NewObsClient(client).Health()
return &res, err
})
Expand Down
Loading