From 2ab73cfdb50779c4c256102bf0f1b75f0a9c5384 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Thu, 28 Mar 2024 11:57:14 +0000 Subject: [PATCH] use conn pool for unauthenticated connections (#1855) --- go/rpc/encrypted_client.go | 10 ++++- tools/walletextension/rpcapi/filter_api.go | 2 +- tools/walletextension/rpcapi/utils.go | 40 +++++++++++-------- .../rpcapi/wallet_extension.go | 4 -- 4 files changed, 33 insertions(+), 23 deletions(-) diff --git a/go/rpc/encrypted_client.go b/go/rpc/encrypted_client.go index 8b0d8635e8..e03dde7e6e 100644 --- a/go/rpc/encrypted_client.go +++ b/go/rpc/encrypted_client.go @@ -68,8 +68,14 @@ func NewEncRPCClient(client Client, viewingKey *viewingkey.ViewingKey, logger ge return encClient, nil } -func (c *EncRPCClient) Client() Client { - return c.obscuroClient +func (c *EncRPCClient) Client() *gethrpc.Client { + switch backingClient := c.obscuroClient.(type) { + case *NetworkClient: + return backingClient.RpcClient + default: + // not supported + return nil + } } // Call handles JSON rpc requests without a context - see CallContext for details diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index 247d235b05..40299a5782 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -190,7 +190,7 @@ func handleUnsubscribe(connectionSub *rpc.Subscription, backendSubscriptions []* backendSub.Unsubscribe() } for _, connection := range connections { - _ = returnConn(p, connection) + _ = returnConn(p, connection.Client()) } } diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index a0f24c1c74..27366ee756 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -59,17 +59,16 @@ func UnauthenticatedTenRPCCall[R any](ctx context.Context, w *Services, cfg *Cac cacheArgs = append(cacheArgs, args...) res, err := withCache(w.Cache, cfg, generateCacheKey(cacheArgs), func() (*R, error) { - var resp *R - unauthedRPC, err := w.UnauthenticatedClient() - if err != nil { - return nil, err - } - if ctx == nil { - err = unauthedRPC.Call(&resp, method, args...) - } else { - err = unauthedRPC.CallContext(ctx, &resp, method, args...) - } - return resp, err + return withPlainRPCConnection(w, func(client *rpc.Client) (*R, error) { + var resp *R + var err error + if ctx == nil { + err = client.Call(&resp, method, args...) + } else { + err = client.CallContext(ctx, &resp, method, args...) + } + return resp, err + }) }) audit(w, "RPC call. method=%s args=%v result=%s error=%s time=%d", method, args, res, err, time.Since(requestStartTime).Milliseconds()) return res, err @@ -104,7 +103,7 @@ func ExecAuthRPC[R any](ctx context.Context, w *Services, cfg *ExecCfg, method s var rpcErr error for i := range candidateAccts { acct := candidateAccts[i] - result, err := withHTTPRPCConnection(w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { + result, err := withEncRPCConnection(w, acct, func(rpcClient *tenrpc.EncRPCClient) (*R, error) { var result *R adjustedArgs := args if cfg.adjustArgs != nil { @@ -258,16 +257,25 @@ func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrp return encClient, nil } -func returnConn(p *pool.ObjectPool, conn *tenrpc.EncRPCClient) error { - c := conn.Client().(*tenrpc.NetworkClient).RpcClient - return p.ReturnObject(context.Background(), c) +func returnConn(p *pool.ObjectPool, conn *rpc.Client) error { + return p.ReturnObject(context.Background(), conn) } -func withHTTPRPCConnection[R any](w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { +func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*tenrpc.EncRPCClient) (*R, error)) (*R, error) { rpcClient, err := conn(acct.user.services.rpcHTTPConnPool, acct, w.logger) if err != nil { return nil, fmt.Errorf("could not connect to backed. Cause: %w", err) } + defer returnConn(w.rpcHTTPConnPool, rpcClient.Client()) + return execute(rpcClient) +} + +func withPlainRPCConnection[R any](w *Services, execute func(client *rpc.Client) (*R, error)) (*R, error) { + connectionObj, err := w.rpcHTTPConnPool.BorrowObject(context.Background()) + if err != nil { + return nil, fmt.Errorf("cannot fetch rpc connection to backend node %w", err) + } + rpcClient := connectionObj.(*rpc.Client) defer returnConn(w.rpcHTTPConnPool, rpcClient) return execute(rpcClient) } diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/rpcapi/wallet_extension.go index 3b27bbe750..b8b6891016 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/rpcapi/wallet_extension.go @@ -226,10 +226,6 @@ func (w *Services) GetTenNodeHealthStatus() (bool, error) { return w.tenClient.Health() } -func (w *Services) UnauthenticatedClient() (rpc.Client, error) { - return rpc.NewNetworkClient(w.HostAddrHTTP) -} - func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlice []string) (string, error) { // Check if the formats are valid for _, format := range formatsSlice {