From 9c6702f47e79c45ad61d1726e31f942b74fac6c2 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Wed, 15 May 2024 13:21:34 +0100 Subject: [PATCH 1/2] improve rpc conn pool --- tools/walletextension/rpcapi/filter_api.go | 4 +- tools/walletextension/rpcapi/utils.go | 39 ++++++++++++------- .../rpcapi/wallet_extension.go | 5 ++- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index 414a85308b..08f380a7b1 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -78,7 +78,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp errorChannels := make([]<-chan error, 0) backendSubscriptions := make([]*rpc.ClientSubscription, 0) for _, address := range candidateAddresses { - rpcWSClient, err := connectWS(user.accounts[*address], api.we.Logger()) + rpcWSClient, err := connectWS(ctx, user.accounts[*address], api.we.Logger()) if err != nil { return nil, err } @@ -143,7 +143,7 @@ func (api *FilterAPI) closeConnections(backendSubscriptions []*rpc.ClientSubscri backendSub.Unsubscribe() } for _, connection := range backendWSConnections { - _ = returnConn(api.we.rpcWSConnPool, connection.BackingClient()) + _ = returnConn(api.we.rpcWSConnPool, connection.BackingClient(), api.logger) } } diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 357e38ca18..9dbca7148a 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -8,6 +8,11 @@ import ( "fmt" "time" + "github.com/ten-protocol/go-ten/go/common/measure" + "github.com/ten-protocol/go-ten/go/enclave/core" + + "github.com/ten-protocol/go-ten/go/common/log" + gethlog "github.com/ethereum/go-ethereum/log" pool "github.com/jolestar/go-commons-pool/v2" tenrpc "github.com/ten-protocol/go-ten/go/rpc" @@ -72,7 +77,7 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac cacheArgs = append(cacheArgs, args...) res, err := withCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) { - return withPlainRPCConnection(w, func(client *rpc.Client) (*R, error) { + return withPlainRPCConnection(ctx, w, func(client *rpc.Client) (*R, error) { var resp *R var err error @@ -117,7 +122,7 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s var rpcErr error for i := range candidateAccts { acct := candidateAccts[i] - result, err := withEncRPCConnection(w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { + result, err := withEncRPCConnection(ctx, w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { var result *R adjustedArgs := args if cfg.adjustArgs != nil { @@ -272,42 +277,48 @@ func cacheBlockNumber(lastBlock rpc.BlockNumber) CacheStrategy { return LatestBatch } -func connectWS(account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { - return conn(account.user.services.rpcWSConnPool, account, logger) +func connectWS(ctx context.Context, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { + return conn(ctx, account.user.services.rpcWSConnPool, account, logger) } -func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { - connectionObj, err := p.BorrowObject(context.Background()) +func conn(ctx context.Context, p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrpc.EncRPCClient, error) { + defer core.LogMethodDuration(logger, measure.NewStopwatch(), "get rpc connection") + connectionObj, err := p.BorrowObject(ctx) if err != nil { return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) } conn := connectionObj.(*rpc.Client) encClient, err := wecommon.CreateEncClient(conn, account.address.Bytes(), account.user.userKey, account.signature, account.signatureType, logger) if err != nil { + _ = returnConn(p, conn, logger) return nil, fmt.Errorf("error creating new client, %w", err) } return encClient, nil } -func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error { - return p.ReturnObject(context.Background(), conn) +func returnConn(p *pool.ObjectPool, conn tenrpc.Client, logger gethlog.Logger) error { + err := p.ReturnObject(context.Background(), conn) + if err != nil { + logger.Error("Error returning connection to pool", log.ErrKey, err) + } + return err } -func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { - rpcClient, err := conn(acct.user.services.rpcHTTPConnPool, acct, w.logger) +func withEncRPCConnection[R any](ctx context.Context, w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { + rpcClient, err := conn(ctx, acct.user.services.rpcHTTPConnPool, acct, w.logger) if err != nil { return nil, fmt.Errorf("could not connect to backed. Cause: %w", err) } - defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient()) + defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient(), w.logger) return execute(rpcClient) } -func withPlainRPCConnection[R any](w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { - connectionObj, err := w.rpcHTTPConnPool.BorrowObject(context.Background()) +func withPlainRPCConnection[R any](ctx context.Context, w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { + connectionObj, err := w.rpcHTTPConnPool.BorrowObject(ctx) if err != nil { return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) } rpcClient := connectionObj.(*rpc.Client) - defer returnConn(w.rpcHTTPConnPool, rpcClient) + defer returnConn(w.rpcHTTPConnPool, rpcClient, w.logger) return execute(rpcClient) } diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/rpcapi/wallet_extension.go index 4c2589c37d..0d2b742791 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/rpcapi/wallet_extension.go @@ -90,7 +90,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage }, nil, nil, nil) cfg := pool.NewDefaultPoolConfig() - cfg.MaxTotal = 100 // todo - what is the right number + cfg.MaxTotal = 200 // todo - what is the right number services := Services{ HostAddrHTTP: hostAddrHTTP, @@ -138,6 +138,7 @@ func subscribeToNewHeadsWithRetry(ch chan *tencommon.BatchHeader, services Servi sub, err = rpcClient.Subscribe(context.Background(), rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeNewHeads) if err != nil { logger.Info("could not subscribe for new head blocks", log.ErrKey, err) + _ = returnConn(services.rpcWSConnPool, rpcClient, logger) } return err }, @@ -270,7 +271,7 @@ func (w *Services) Version() string { } func (w *Services) GetTenNodeHealthStatus() (bool, error) { - res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) { + res, err := withPlainRPCConnection[bool](context.Background(), w, func(client *gethrpc.Client) (*bool, error) { res, err := obsclient.NewObsClient(client).Health() return &res, err }) From 1f16e56c53c360932cc7cd0b50cf4fa887b4728a Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Wed, 15 May 2024 13:32:50 +0100 Subject: [PATCH 2/2] release connections --- tools/walletextension/rpcapi/filter_api.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index 08f380a7b1..b7f049833c 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -116,7 +116,10 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp } return nil }, - nil, // todo - we can implement reconnect logic here + func() { + // release resources + api.closeConnections(backendSubscriptions, backendWSConnections) + }, // todo - we can implement reconnect logic here &unsubscribedByBackend, &unsubscribedByClient, 12*time.Hour, @@ -126,7 +129,6 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp // handles any of the backend connections being closed go subscriptioncommon.HandleUnsubscribeErrChan(errorChannels, func() { unsubscribedByBackend.Store(true) - api.closeConnections(backendSubscriptions, backendWSConnections) }) // handles "unsubscribe" from the user