Skip to content

Commit

Permalink
improve rpc conn pool
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed May 15, 2024
1 parent 1acc9e3 commit 9c6702f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
4 changes: 2 additions & 2 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
errorChannels := make([]<-chan error, 0)
backendSubscriptions := make([]*rpc.ClientSubscription, 0)
for _, address := range candidateAddresses {
rpcWSClient, err := connectWS(user.accounts[*address], api.we.Logger())
rpcWSClient, err := connectWS(ctx, user.accounts[*address], api.we.Logger())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func (api *FilterAPI) closeConnections(backendSubscriptions []*rpc.ClientSubscri
backendSub.Unsubscribe()
}
for _, connection := range backendWSConnections {
_ = returnConn(api.we.rpcWSConnPool, connection.BackingClient())
_ = returnConn(api.we.rpcWSConnPool, connection.BackingClient(), api.logger)
}
}

Expand Down
39 changes: 25 additions & 14 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/common/measure"
"github.com/ten-protocol/go-ten/go/enclave/core"

"github.com/ten-protocol/go-ten/go/common/log"

gethlog "github.com/ethereum/go-ethereum/log"
pool "github.com/jolestar/go-commons-pool/v2"
tenrpc "github.com/ten-protocol/go-ten/go/rpc"
Expand Down Expand Up @@ -72,7 +77,7 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac
cacheArgs = append(cacheArgs, args...)

res, err := withCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) {
return withPlainRPCConnection(w, func(client *rpc.Client) (*R, error) {
return withPlainRPCConnection(ctx, w, func(client *rpc.Client) (*R, error) {
var resp *R
var err error

Expand Down Expand Up @@ -117,7 +122,7 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s
var rpcErr error
for i := range candidateAccts {
acct := candidateAccts[i]
result, err := withEncRPCConnection(w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) {
result, err := withEncRPCConnection(ctx, w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) {
var result *R
adjustedArgs := args
if cfg.adjustArgs != nil {
Expand Down Expand Up @@ -272,42 +277,48 @@ func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy {
return LatestBatch
}

func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
return conn(account.user.services.rpcWSConnPool, account, logger)
func connectWS(ctx context.Context, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
return conn(ctx, account.user.services.rpcWSConnPool, account, logger)
}

func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
connectionObj, err := p.BorrowObject(context.Background())
func conn(ctx context.Context, p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) {
defer core.LogMethodDuration(logger, measure.NewStopwatch(), "get rpc connection")
connectionObj, err := p.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err)
}
conn := connectionObj.(*rpc.Client)
encClient, err := wecommon.CreateEncClient(conn, account.address.Bytes(), account.user.userKey, account.signature, account.signatureType, logger)
if err != nil {
_ = returnConn(p, conn, logger)
return nil, fmt.Errorf("error creating new client, %w", err)
}
return encClient, nil
}

func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error {
return p.ReturnObject(context.Background(), conn)
func returnConn(p *pool.ObjectPool, conn tenrpc.Client, logger gethlog.Logger) error {
err := p.ReturnObject(context.Background(), conn)
if err != nil {
logger.Error("Error returning connection to pool", log.ErrKey, err)
}
return err
}

func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) {
rpcClient, err := conn(acct.user.services.rpcHTTPConnPool, acct, w.logger)
func withEncRPCConnection[R any](ctx context.Context, w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) {
rpcClient, err := conn(ctx, acct.user.services.rpcHTTPConnPool, acct, w.logger)
if err != nil {
return nil, fmt.Errorf("could not connect to backed. Cause: %w", err)
}
defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient())
defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient(), w.logger)
return execute(rpcClient)
}

func withPlainRPCConnection[R any](w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) {
connectionObj, err := w.rpcHTTPConnPool.BorrowObject(context.Background())
func withPlainRPCConnection[R any](ctx context.Context, w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) {
connectionObj, err := w.rpcHTTPConnPool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err)
}
rpcClient := connectionObj.(*rpc.Client)
defer returnConn(w.rpcHTTPConnPool, rpcClient)
defer returnConn(w.rpcHTTPConnPool, rpcClient, w.logger)
return execute(rpcClient)
}
5 changes: 3 additions & 2 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
}, nil, nil, nil)

cfg := pool.NewDefaultPoolConfig()
cfg.MaxTotal = 100 // todo - what is the right number
cfg.MaxTotal = 200 // todo - what is the right number

services := Services{
HostAddrHTTP: hostAddrHTTP,
Expand Down Expand Up @@ -138,6 +138,7 @@ func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Servi
sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads)
if err != nil {
logger.Info("could not subscribe for new head blocks", log.ErrKey, err)
_ = returnConn(services.rpcWSConnPool, rpcClient, logger)
}
return err
},
Expand Down Expand Up @@ -270,7 +271,7 @@ func (w *Services) Version() string {
}

func (w *Services) GetTenNodeHealthStatus() (bool, error) {
res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) {
res, err := withPlainRPCConnection[bool](context.Background(), w, func(client *gethrpc.Client) (*bool, error) {
res, err := obsclient.NewObsClient(client).Health()
return &res, err
})
Expand Down

0 comments on commit 9c6702f

Please sign in to comment.