Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align RPC interfaces #1856

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/obsclient/authclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (ac *AuthObsClient) BalanceAt(ctx context.Context, blockNumber *big.Int) (*
}

func (ac *AuthObsClient) SubscribeFilterLogs(ctx context.Context, filterCriteria common.FilterCriteria, ch chan common.IDAndLog) (ethereum.Subscription, error) {
return ac.rpcClient.Subscribe(ctx, nil, rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeLogs, filterCriteria)
return ac.rpcClient.Subscribe(ctx, rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeLogs, filterCriteria)
}

func (ac *AuthObsClient) GetLogs(ctx context.Context, filterCriteria common.FilterCriteria) ([]*types.Log, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/obsclient/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (m *rpcClientMock) CallContext(ctx context.Context, result interface{}, met
return arguments.Error(0)
}

func (m *rpcClientMock) Subscribe(context.Context, interface{}, string, interface{}, ...interface{}) (*rpc.ClientSubscription, error) {
func (m *rpcClientMock) Subscribe(context.Context, string, interface{}, ...interface{}) (*rpc.ClientSubscription, error) {
panic("not implemented")
}

Expand Down
2 changes: 1 addition & 1 deletion go/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Client interface {
// CallContext If the context is canceled before the call has successfully returned, CallContext returns immediately.
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
// Subscribe creates a subscription to the Obscuro host.
Subscribe(ctx context.Context, result interface{}, namespace string, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
// Stop closes the client.
Stop()
}
14 changes: 4 additions & 10 deletions go/rpc/encrypted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,8 @@ func NewEncRPCClient(client Client, viewingKey *viewingkey.ViewingKey, logger ge
return encClient, nil
}

func (c *EncRPCClient) Client() *gethrpc.Client {
switch backingClient := c.obscuroClient.(type) {
case *NetworkClient:
return backingClient.RpcClient
default:
// not supported
return nil
}
func (c *EncRPCClient) BackingClient() Client {
return c.obscuroClient
}

// Call handles JSON rpc requests without a context - see CallContext for details
Expand All @@ -101,7 +95,7 @@ func (c *EncRPCClient) CallContext(ctx context.Context, result interface{}, meth
return c.executeSensitiveCall(ctx, result, method, args...)
}

func (c *EncRPCClient) Subscribe(ctx context.Context, _ interface{}, namespace string, ch interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) {
func (c *EncRPCClient) Subscribe(ctx context.Context, namespace string, ch interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) {
if len(args) == 0 {
return nil, fmt.Errorf("subscription did not specify its type")
}
Expand Down Expand Up @@ -131,7 +125,7 @@ func (c *EncRPCClient) Subscribe(ctx context.Context, _ interface{}, namespace s
return nil, fmt.Errorf("expected a channel of type `chan types.Log`, got %T", ch)
}
clientChannel := make(chan common.IDAndEncLog)
subscriptionToObscuro, err := c.obscuroClient.Subscribe(ctx, nil, namespace, clientChannel, subscriptionType, encryptedParams)
subscriptionToObscuro, err := c.obscuroClient.Subscribe(ctx, namespace, clientChannel, subscriptionType, encryptedParams)
if err != nil {
return nil, err
}
Expand Down
47 changes: 2 additions & 45 deletions go/rpc/network_client.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
package rpc

import (
"context"
"fmt"
"strings"

"github.com/ten-protocol/go-ten/go/common/viewingkey"
"github.com/ten-protocol/go-ten/lib/gethfork/rpc"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"

gethlog "github.com/ethereum/go-ethereum/log"
)

const (
ws = "ws://"
http = "http://"
)

// NetworkClient is a Client implementation that wraps Geth's rpc.Client to make calls to the obscuro node
type NetworkClient struct {
RpcClient *rpc.Client
}

// NewEncNetworkClient returns a network RPC client with Viewing Key encryption/decryption
func NewEncNetworkClient(rpcAddress string, viewingKey *viewingkey.ViewingKey, logger gethlog.Logger) (*EncRPCClient, error) {
rpcClient, err := NewNetworkClient(rpcAddress)
Expand All @@ -36,7 +22,7 @@ func NewEncNetworkClient(rpcAddress string, viewingKey *viewingkey.ViewingKey, l
}

func NewEncNetworkClientFromConn(connection *gethrpc.Client, viewingKey *viewingkey.ViewingKey, logger gethlog.Logger) (*EncRPCClient, error) {
encClient, err := NewEncRPCClient(&NetworkClient{RpcClient: connection}, viewingKey, logger)
encClient, err := NewEncRPCClient(connection, viewingKey, logger)
if err != nil {
return nil, err
}
Expand All @@ -45,34 +31,5 @@ func NewEncNetworkClientFromConn(connection *gethrpc.Client, viewingKey *viewing

// NewNetworkClient returns a client that can make RPC calls to an Obscuro node
func NewNetworkClient(address string) (Client, error) {
if !strings.HasPrefix(address, http) && !strings.HasPrefix(address, ws) {
return nil, fmt.Errorf("clients for Obscuro only support the %s and %s protocols", http, ws)
}

rpcClient, err := rpc.Dial(address)
if err != nil {
return nil, fmt.Errorf("could not create RPC client on %s. Cause: %w", address, err)
}

return &NetworkClient{
RpcClient: rpcClient,
}, nil
}

// Call handles JSON rpc requests, delegating to the geth RPC client
// The result must be a pointer so that package json can unmarshal into it. You can also pass nil, in which case the result is ignored.
func (c *NetworkClient) Call(result interface{}, method string, args ...interface{}) error {
return c.RpcClient.Call(result, method, args...)
}

func (c *NetworkClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return c.RpcClient.CallContext(ctx, result, method, args...)
}

func (c *NetworkClient) Subscribe(ctx context.Context, _ interface{}, namespace string, channel interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) {
return c.RpcClient.Subscribe(ctx, namespace, channel, args...)
}

func (c *NetworkClient) Stop() {
c.RpcClient.Close()
return rpc.Dial(address)
}
2 changes: 1 addition & 1 deletion integration/simulation/p2p/in_mem_obscuro_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *inMemObscuroClient) CallContext(_ context.Context, result interface{},
return c.Call(result, method, args...) //nolint: contextcheck
}

func (c *inMemObscuroClient) Subscribe(context.Context, interface{}, string, interface{}, ...interface{}) (*gethrpc.ClientSubscription, error) {
func (c *inMemObscuroClient) Subscribe(context.Context, string, interface{}, ...interface{}) (*gethrpc.ClientSubscription, error) {
panic("not implemented")
}

Expand Down
6 changes: 6 additions & 0 deletions lib/gethfork/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ type clientConn struct {
handler *handler
}

// Stop closes the client.
func (c *Client) Stop() {
c.Close()
}

func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.Background()
ctx = context.WithValue(ctx, clientContextKey{}, c)
Expand Down Expand Up @@ -506,6 +511,7 @@ func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
// Subscribe creates a subscription to the Obscuro host.
func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
Expand Down
4 changes: 2 additions & 2 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
connections = append(connections, rpcWSClient)

inCh := make(chan common.IDAndLog)
backendSubscription, err := rpcWSClient.Subscribe(ctx, nil, "eth", inCh, "logs", crit)
backendSubscription, err := rpcWSClient.Subscribe(ctx, "eth", inCh, "logs", crit)
if err != nil {
fmt.Printf("could not connect to backend %s", err)
return nil, err
Expand Down Expand Up @@ -190,7 +190,7 @@ func handleUnsubscribe(connectionSub *rpc.Subscription, backendSubscriptions []*
backendSub.Unsubscribe()
}
for _, connection := range connections {
_ = returnConn(p, connection.Client())
_ = returnConn(p, connection.BackingClient())
}
}

Expand Down
4 changes: 2 additions & 2 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrp
return encClient, nil
}

func returnConn(p *pool.ObjectPool, conn *rpc.Client) error {
func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error {
return p.ReturnObject(context.Background(), conn)
}

Expand All @@ -266,7 +266,7 @@ func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*ten
if err != nil {
return nil, fmt.Errorf("could not connect to backed. Cause: %w", err)
}
defer returnConn(w.rpcHTTPConnPool, rpcClient.Client())
defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient())
return execute(rpcClient)
}

Expand Down
22 changes: 8 additions & 14 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ import (
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/obsclient"

pool "github.com/jolestar/go-commons-pool/v2"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"

"github.com/status-im/keycard-go/hexutils"

"github.com/ten-protocol/go-ten/tools/walletextension/cache"

"github.com/ten-protocol/go-ten/go/obsclient"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common/stopcontrol"
"github.com/ten-protocol/go-ten/go/common/viewingkey"
"github.com/ten-protocol/go-ten/go/rpc"
"github.com/ten-protocol/go-ten/tools/walletextension/common"
"github.com/ten-protocol/go-ten/tools/walletextension/storage"
)
Expand All @@ -36,7 +35,6 @@ type Services struct {
FileLogger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
tenClient *obsclient.ObsClient
Cache cache.Cache
// the OG maintains a connection pool of rpc connections to underlying nodes
rpcHTTPConnPool *pool.ObjectPool
Expand All @@ -45,12 +43,6 @@ type Services struct {
}

func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services {
rpcClient, err := rpc.NewNetworkClient(hostAddrHTTP)
if err != nil {
logger.Error(fmt.Errorf("could not create RPC client on %s. Cause: %w", hostAddrHTTP, err).Error())
panic(err)
}
newTenClient := obsclient.NewObsClient(rpcClient)
newFileLogger := common.NewFileLogger()
newGatewayCache, err := cache.NewCache(logger)
if err != nil {
Expand Down Expand Up @@ -85,8 +77,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
}, nil, nil, nil)

cfg := pool.NewDefaultPoolConfig()
cfg.MaxTotal = 100
cfg.MaxTotal = 50
cfg.MaxTotal = 100 // todo - what is the right number

return &Services{
HostAddrHTTP: hostAddrHTTP,
Expand All @@ -96,7 +87,6 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
FileLogger: newFileLogger,
stopControl: stopControl,
version: version,
tenClient: newTenClient,
Cache: newGatewayCache,
rpcHTTPConnPool: pool.NewObjectPool(context.Background(), factoryHTTP, cfg),
rpcWSConnPool: pool.NewObjectPool(context.Background(), factoryWS, cfg),
Expand Down Expand Up @@ -223,7 +213,11 @@ func (w *Services) Version() string {
}

func (w *Services) GetTenNodeHealthStatus() (bool, error) {
return w.tenClient.Health()
res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) {
res, err := obsclient.NewObsClient(client).Health()
return &res, err
})
return *res, err
}

func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlice []string) (string, error) {
Expand Down
Loading