diff --git a/go/obsclient/authclient.go b/go/obsclient/authclient.go index 6f4eec0495..1ead28487b 100644 --- a/go/obsclient/authclient.go +++ b/go/obsclient/authclient.go @@ -192,7 +192,7 @@ func (ac *AuthObsClient) BalanceAt(ctx context.Context, blockNumber *big.Int) (* } func (ac *AuthObsClient) SubscribeFilterLogs(ctx context.Context, filterCriteria common.FilterCriteria, ch chan common.IDAndLog) (ethereum.Subscription, error) { - return ac.rpcClient.Subscribe(ctx, nil, rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeLogs, filterCriteria) + return ac.rpcClient.Subscribe(ctx, rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeLogs, filterCriteria) } func (ac *AuthObsClient) GetLogs(ctx context.Context, filterCriteria common.FilterCriteria) ([]*types.Log, error) { diff --git a/go/obsclient/test_util.go b/go/obsclient/test_util.go index ace5d981ad..796d388217 100644 --- a/go/obsclient/test_util.go +++ b/go/obsclient/test_util.go @@ -23,7 +23,7 @@ func (m *rpcClientMock) CallContext(ctx context.Context, result interface{}, met return arguments.Error(0) } -func (m *rpcClientMock) Subscribe(context.Context, interface{}, string, interface{}, ...interface{}) (*rpc.ClientSubscription, error) { +func (m *rpcClientMock) Subscribe(context.Context, string, interface{}, ...interface{}) (*rpc.ClientSubscription, error) { panic("not implemented") } diff --git a/go/rpc/client.go b/go/rpc/client.go index 78e52fb28a..b1d05c6f6e 100644 --- a/go/rpc/client.go +++ b/go/rpc/client.go @@ -55,7 +55,7 @@ type Client interface { // CallContext If the context is canceled before the call has successfully returned, CallContext returns immediately. CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error // Subscribe creates a subscription to the Obscuro host. - Subscribe(ctx context.Context, result interface{}, namespace string, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) + Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) // Stop closes the client. Stop() } diff --git a/go/rpc/encrypted_client.go b/go/rpc/encrypted_client.go index e03dde7e6e..80b55571a7 100644 --- a/go/rpc/encrypted_client.go +++ b/go/rpc/encrypted_client.go @@ -68,14 +68,8 @@ func NewEncRPCClient(client Client, viewingKey *viewingkey.ViewingKey, logger ge return encClient, nil } -func (c *EncRPCClient) Client() *gethrpc.Client { - switch backingClient := c.obscuroClient.(type) { - case *NetworkClient: - return backingClient.RpcClient - default: - // not supported - return nil - } +func (c *EncRPCClient) BackingClient() Client { + return c.obscuroClient } // Call handles JSON rpc requests without a context - see CallContext for details @@ -101,7 +95,7 @@ func (c *EncRPCClient) CallContext(ctx context.Context, result interface{}, meth return c.executeSensitiveCall(ctx, result, method, args...) } -func (c *EncRPCClient) Subscribe(ctx context.Context, _ interface{}, namespace string, ch interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) { +func (c *EncRPCClient) Subscribe(ctx context.Context, namespace string, ch interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) { if len(args) == 0 { return nil, fmt.Errorf("subscription did not specify its type") } @@ -131,7 +125,7 @@ func (c *EncRPCClient) Subscribe(ctx context.Context, _ interface{}, namespace s return nil, fmt.Errorf("expected a channel of type `chan types.Log`, got %T", ch) } clientChannel := make(chan common.IDAndEncLog) - subscriptionToObscuro, err := c.obscuroClient.Subscribe(ctx, nil, namespace, clientChannel, subscriptionType, encryptedParams) + subscriptionToObscuro, err := c.obscuroClient.Subscribe(ctx, namespace, clientChannel, subscriptionType, encryptedParams) if err != nil { return nil, err } diff --git a/go/rpc/network_client.go b/go/rpc/network_client.go index aa08c4274c..2814aa009f 100644 --- a/go/rpc/network_client.go +++ b/go/rpc/network_client.go @@ -1,10 +1,6 @@ package rpc import ( - "context" - "fmt" - "strings" - "github.com/ten-protocol/go-ten/go/common/viewingkey" "github.com/ten-protocol/go-ten/lib/gethfork/rpc" gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc" @@ -12,16 +8,6 @@ import ( gethlog "github.com/ethereum/go-ethereum/log" ) -const ( - ws = "ws://" - http = "http://" -) - -// NetworkClient is a Client implementation that wraps Geth's rpc.Client to make calls to the obscuro node -type NetworkClient struct { - RpcClient *rpc.Client -} - // NewEncNetworkClient returns a network RPC client with Viewing Key encryption/decryption func NewEncNetworkClient(rpcAddress string, viewingKey *viewingkey.ViewingKey, logger gethlog.Logger) (*EncRPCClient, error) { rpcClient, err := NewNetworkClient(rpcAddress) @@ -36,7 +22,7 @@ func NewEncNetworkClient(rpcAddress string, viewingKey *viewingkey.ViewingKey, l } func NewEncNetworkClientFromConn(connection *gethrpc.Client, viewingKey *viewingkey.ViewingKey, logger gethlog.Logger) (*EncRPCClient, error) { - encClient, err := NewEncRPCClient(&NetworkClient{RpcClient: connection}, viewingKey, logger) + encClient, err := NewEncRPCClient(connection, viewingKey, logger) if err != nil { return nil, err } @@ -45,34 +31,5 @@ func NewEncNetworkClientFromConn(connection *gethrpc.Client, viewingKey *viewing // NewNetworkClient returns a client that can make RPC calls to an Obscuro node func NewNetworkClient(address string) (Client, error) { - if !strings.HasPrefix(address, http) && !strings.HasPrefix(address, ws) { - return nil, fmt.Errorf("clients for Obscuro only support the %s and %s protocols", http, ws) - } - - rpcClient, err := rpc.Dial(address) - if err != nil { - return nil, fmt.Errorf("could not create RPC client on %s. Cause: %w", address, err) - } - - return &NetworkClient{ - RpcClient: rpcClient, - }, nil -} - -// Call handles JSON rpc requests, delegating to the geth RPC client -// The result must be a pointer so that package json can unmarshal into it. You can also pass nil, in which case the result is ignored. -func (c *NetworkClient) Call(result interface{}, method string, args ...interface{}) error { - return c.RpcClient.Call(result, method, args...) -} - -func (c *NetworkClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - return c.RpcClient.CallContext(ctx, result, method, args...) -} - -func (c *NetworkClient) Subscribe(ctx context.Context, _ interface{}, namespace string, channel interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) { - return c.RpcClient.Subscribe(ctx, namespace, channel, args...) -} - -func (c *NetworkClient) Stop() { - c.RpcClient.Close() + return rpc.Dial(address) } diff --git a/integration/simulation/p2p/in_mem_obscuro_client.go b/integration/simulation/p2p/in_mem_obscuro_client.go index 9026ce671f..02236be3d0 100644 --- a/integration/simulation/p2p/in_mem_obscuro_client.go +++ b/integration/simulation/p2p/in_mem_obscuro_client.go @@ -116,7 +116,7 @@ func (c *inMemObscuroClient) CallContext(_ context.Context, result interface{}, return c.Call(result, method, args...) //nolint: contextcheck } -func (c *inMemObscuroClient) Subscribe(context.Context, interface{}, string, interface{}, ...interface{}) (*gethrpc.ClientSubscription, error) { +func (c *inMemObscuroClient) Subscribe(context.Context, string, interface{}, ...interface{}) (*gethrpc.ClientSubscription, error) { panic("not implemented") } diff --git a/lib/gethfork/rpc/client.go b/lib/gethfork/rpc/client.go index 805f375441..f0d276dd89 100644 --- a/lib/gethfork/rpc/client.go +++ b/lib/gethfork/rpc/client.go @@ -116,6 +116,11 @@ type clientConn struct { handler *handler } +// Stop closes the client. +func (c *Client) Stop() { + c.Close() +} + func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.Background() ctx = context.WithValue(ctx, clientContextKey{}, c) @@ -506,6 +511,7 @@ func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ... // before considering the subscriber dead. The subscription Err channel will receive // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure // that the channel usually has at least one reader to prevent this issue. +// Subscribe creates a subscription to the Obscuro host. func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { // Check type of channel first. chanVal := reflect.ValueOf(channel) diff --git a/tools/walletextension/rpcapi/filter_api.go b/tools/walletextension/rpcapi/filter_api.go index 40299a5782..14492fb4c6 100644 --- a/tools/walletextension/rpcapi/filter_api.go +++ b/tools/walletextension/rpcapi/filter_api.go @@ -72,7 +72,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp connections = append(connections, rpcWSClient) inCh := make(chan common.IDAndLog) - backendSubscription, err := rpcWSClient.Subscribe(ctx, nil, "eth", inCh, "logs", crit) + backendSubscription, err := rpcWSClient.Subscribe(ctx, "eth", inCh, "logs", crit) if err != nil { fmt.Printf("could not connect to backend %s", err) return nil, err @@ -190,7 +190,7 @@ func handleUnsubscribe(connectionSub *rpc.Subscription, backendSubscriptions []* backendSub.Unsubscribe() } for _, connection := range connections { - _ = returnConn(p, connection.Client()) + _ = returnConn(p, connection.BackingClient()) } } diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 27366ee756..e6e1317968 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -257,7 +257,7 @@ func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrp return encClient, nil } -func returnConn(p *pool.ObjectPool, conn *rpc.Client) error { +func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error { return p.ReturnObject(context.Background(), conn) } @@ -266,7 +266,7 @@ func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*ten if err != nil { return nil, fmt.Errorf("could not connect to backed. Cause: %w", err) } - defer returnConn(w.rpcHTTPConnPool, rpcClient.Client()) + defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient()) return execute(rpcClient) } diff --git a/tools/walletextension/rpcapi/wallet_extension.go b/tools/walletextension/rpcapi/wallet_extension.go index b8b6891016..6e271a86dd 100644 --- a/tools/walletextension/rpcapi/wallet_extension.go +++ b/tools/walletextension/rpcapi/wallet_extension.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "github.com/ten-protocol/go-ten/go/obsclient" + pool "github.com/jolestar/go-commons-pool/v2" gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc" @@ -14,15 +16,12 @@ import ( "github.com/ten-protocol/go-ten/tools/walletextension/cache" - "github.com/ten-protocol/go-ten/go/obsclient" - gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" gethlog "github.com/ethereum/go-ethereum/log" "github.com/ten-protocol/go-ten/go/common/stopcontrol" "github.com/ten-protocol/go-ten/go/common/viewingkey" - "github.com/ten-protocol/go-ten/go/rpc" "github.com/ten-protocol/go-ten/tools/walletextension/common" "github.com/ten-protocol/go-ten/tools/walletextension/storage" ) @@ -36,7 +35,6 @@ type Services struct { FileLogger gethlog.Logger stopControl *stopcontrol.StopControl version string - tenClient *obsclient.ObsClient Cache cache.Cache // the OG maintains a connection pool of rpc connections to underlying nodes rpcHTTPConnPool *pool.ObjectPool @@ -45,12 +43,6 @@ type Services struct { } func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services { - rpcClient, err := rpc.NewNetworkClient(hostAddrHTTP) - if err != nil { - logger.Error(fmt.Errorf("could not create RPC client on %s. Cause: %w", hostAddrHTTP, err).Error()) - panic(err) - } - newTenClient := obsclient.NewObsClient(rpcClient) newFileLogger := common.NewFileLogger() newGatewayCache, err := cache.NewCache(logger) if err != nil { @@ -85,8 +77,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage }, nil, nil, nil) cfg := pool.NewDefaultPoolConfig() - cfg.MaxTotal = 100 - cfg.MaxTotal = 50 + cfg.MaxTotal = 100 // todo - what is the right number return &Services{ HostAddrHTTP: hostAddrHTTP, @@ -96,7 +87,6 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage FileLogger: newFileLogger, stopControl: stopControl, version: version, - tenClient: newTenClient, Cache: newGatewayCache, rpcHTTPConnPool: pool.NewObjectPool(context.Background(), factoryHTTP, cfg), rpcWSConnPool: pool.NewObjectPool(context.Background(), factoryWS, cfg), @@ -223,7 +213,11 @@ func (w *Services) Version() string { } func (w *Services) GetTenNodeHealthStatus() (bool, error) { - return w.tenClient.Health() + res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) { + res, err := obsclient.NewObsClient(client).Health() + return &res, err + }) + return *res, err } func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlice []string) (string, error) {