Skip to content

Commit

Permalink
align internal interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tudor-malene committed Mar 28, 2024
1 parent 2ab73cf commit 6606773
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 77 deletions.
2 changes: 1 addition & 1 deletion go/obsclient/authclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (ac *AuthObsClient) BalanceAt(ctx context.Context, blockNumber *big.Int) (*
}

func (ac *AuthObsClient) SubscribeFilterLogs(ctx context.Context, filterCriteria common.FilterCriteria, ch chan common.IDAndLog) (ethereum.Subscription, error) {
return ac.rpcClient.Subscribe(ctx, nil, rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeLogs, filterCriteria)
return ac.rpcClient.Subscribe(ctx, rpc.SubscribeNamespace, ch, rpc.SubscriptionTypeLogs, filterCriteria)
}

func (ac *AuthObsClient) GetLogs(ctx context.Context, filterCriteria common.FilterCriteria) ([]*types.Log, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/obsclient/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (m *rpcClientMock) CallContext(ctx context.Context, result interface{}, met
return arguments.Error(0)
}

func (m *rpcClientMock) Subscribe(context.Context, interface{}, string, interface{}, ...interface{}) (*rpc.ClientSubscription, error) {
func (m *rpcClientMock) Subscribe(context.Context, string, interface{}, ...interface{}) (*rpc.ClientSubscription, error) {
panic("not implemented")
}

Expand Down
2 changes: 1 addition & 1 deletion go/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Client interface {
// CallContext If the context is canceled before the call has successfully returned, CallContext returns immediately.
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
// Subscribe creates a subscription to the Obscuro host.
Subscribe(ctx context.Context, result interface{}, namespace string, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
// Stop closes the client.
Stop()
}
14 changes: 4 additions & 10 deletions go/rpc/encrypted_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,8 @@ func NewEncRPCClient(client Client, viewingKey *viewingkey.ViewingKey, logger ge
return encClient, nil
}

func (c *EncRPCClient) Client() *gethrpc.Client {
switch backingClient := c.obscuroClient.(type) {
case *NetworkClient:
return backingClient.RpcClient
default:
// not supported
return nil
}
func (c *EncRPCClient) BackingClient() Client {
return c.obscuroClient
}

// Call handles JSON rpc requests without a context - see CallContext for details
Expand All @@ -101,7 +95,7 @@ func (c *EncRPCClient) CallContext(ctx context.Context, result interface{}, meth
return c.executeSensitiveCall(ctx, result, method, args...)
}

func (c *EncRPCClient) Subscribe(ctx context.Context, _ interface{}, namespace string, ch interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) {
func (c *EncRPCClient) Subscribe(ctx context.Context, namespace string, ch interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) {
if len(args) == 0 {
return nil, fmt.Errorf("subscription did not specify its type")
}
Expand Down Expand Up @@ -131,7 +125,7 @@ func (c *EncRPCClient) Subscribe(ctx context.Context, _ interface{}, namespace s
return nil, fmt.Errorf("expected a channel of type `chan types.Log`, got %T", ch)
}
clientChannel := make(chan common.IDAndEncLog)
subscriptionToObscuro, err := c.obscuroClient.Subscribe(ctx, nil, namespace, clientChannel, subscriptionType, encryptedParams)
subscriptionToObscuro, err := c.obscuroClient.Subscribe(ctx, namespace, clientChannel, subscriptionType, encryptedParams)
if err != nil {
return nil, err
}
Expand Down
47 changes: 2 additions & 45 deletions go/rpc/network_client.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
package rpc

import (
"context"
"fmt"
"strings"

"github.com/ten-protocol/go-ten/go/common/viewingkey"
"github.com/ten-protocol/go-ten/lib/gethfork/rpc"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"

gethlog "github.com/ethereum/go-ethereum/log"
)

const (
ws = "ws://"
http = "http://"
)

// NetworkClient is a Client implementation that wraps Geth's rpc.Client to make calls to the obscuro node
type NetworkClient struct {
RpcClient *rpc.Client
}

// NewEncNetworkClient returns a network RPC client with Viewing Key encryption/decryption
func NewEncNetworkClient(rpcAddress string, viewingKey *viewingkey.ViewingKey, logger gethlog.Logger) (*EncRPCClient, error) {
rpcClient, err := NewNetworkClient(rpcAddress)
Expand All @@ -36,7 +22,7 @@ func NewEncNetworkClient(rpcAddress string, viewingKey *viewingkey.ViewingKey, l
}

func NewEncNetworkClientFromConn(connection *gethrpc.Client, viewingKey *viewingkey.ViewingKey, logger gethlog.Logger) (*EncRPCClient, error) {
encClient, err := NewEncRPCClient(&NetworkClient{RpcClient: connection}, viewingKey, logger)
encClient, err := NewEncRPCClient(connection, viewingKey, logger)
if err != nil {
return nil, err
}
Expand All @@ -45,34 +31,5 @@ func NewEncNetworkClientFromConn(connection *gethrpc.Client, viewingKey *viewing

// NewNetworkClient returns a client that can make RPC calls to an Obscuro node
func NewNetworkClient(address string) (Client, error) {
if !strings.HasPrefix(address, http) && !strings.HasPrefix(address, ws) {
return nil, fmt.Errorf("clients for Obscuro only support the %s and %s protocols", http, ws)
}

rpcClient, err := rpc.Dial(address)
if err != nil {
return nil, fmt.Errorf("could not create RPC client on %s. Cause: %w", address, err)
}

return &NetworkClient{
RpcClient: rpcClient,
}, nil
}

// Call handles JSON rpc requests, delegating to the geth RPC client
// The result must be a pointer so that package json can unmarshal into it. You can also pass nil, in which case the result is ignored.
func (c *NetworkClient) Call(result interface{}, method string, args ...interface{}) error {
return c.RpcClient.Call(result, method, args...)
}

func (c *NetworkClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return c.RpcClient.CallContext(ctx, result, method, args...)
}

func (c *NetworkClient) Subscribe(ctx context.Context, _ interface{}, namespace string, channel interface{}, args ...interface{}) (*gethrpc.ClientSubscription, error) {
return c.RpcClient.Subscribe(ctx, namespace, channel, args...)
}

func (c *NetworkClient) Stop() {
c.RpcClient.Close()
return rpc.Dial(address)
}
2 changes: 1 addition & 1 deletion integration/simulation/p2p/in_mem_obscuro_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *inMemObscuroClient) CallContext(_ context.Context, result interface{},
return c.Call(result, method, args...) //nolint: contextcheck
}

func (c *inMemObscuroClient) Subscribe(context.Context, interface{}, string, interface{}, ...interface{}) (*gethrpc.ClientSubscription, error) {
func (c *inMemObscuroClient) Subscribe(context.Context, string, interface{}, ...interface{}) (*gethrpc.ClientSubscription, error) {
panic("not implemented")
}

Expand Down
4 changes: 4 additions & 0 deletions lib/gethfork/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type clientConn struct {
handler *handler
}

// Stop closes the client.
func (c *Client) Stop() {}

func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.Background()
ctx = context.WithValue(ctx, clientContextKey{}, c)
Expand Down Expand Up @@ -506,6 +509,7 @@ func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
// Subscribe creates a subscription to the Obscuro host.
func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
Expand Down
4 changes: 2 additions & 2 deletions tools/walletextension/rpcapi/filter_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit common.FilterCriteria) (*rp
connections = append(connections, rpcWSClient)

inCh := make(chan common.IDAndLog)
backendSubscription, err := rpcWSClient.Subscribe(ctx, nil, "eth", inCh, "logs", crit)
backendSubscription, err := rpcWSClient.Subscribe(ctx, "eth", inCh, "logs", crit)
if err != nil {
fmt.Printf("could not connect to backend %s", err)
return nil, err
Expand Down Expand Up @@ -190,7 +190,7 @@ func handleUnsubscribe(connectionSub *rpc.Subscription, backendSubscriptions []*
backendSub.Unsubscribe()
}
for _, connection := range connections {
_ = returnConn(p, connection.Client())
_ = returnConn(p, connection.BackingClient())
}
}

Expand Down
4 changes: 2 additions & 2 deletions tools/walletextension/rpcapi/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func conn(p *pool.ObjectPool, account *GWAccount, logger gethlog.Logger) (*tenrp
return encClient, nil
}

func returnConn(p *pool.ObjectPool, conn *rpc.Client) error {
func returnConn(p *pool.ObjectPool, conn tenrpc.Client) error {
return p.ReturnObject(context.Background(), conn)
}

Expand All @@ -266,7 +266,7 @@ func withEncRPCConnection[R any](w *Services, acct *GWAccount, execute func(*ten
if err != nil {
return nil, fmt.Errorf("could not connect to backed. Cause: %w", err)
}
defer returnConn(w.rpcHTTPConnPool, rpcClient.Client())
defer returnConn(w.rpcHTTPConnPool, rpcClient.BackingClient())
return execute(rpcClient)
}

Expand Down
22 changes: 8 additions & 14 deletions tools/walletextension/rpcapi/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ import (
"fmt"
"time"

"github.com/ten-protocol/go-ten/go/obsclient"

pool "github.com/jolestar/go-commons-pool/v2"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"

"github.com/status-im/keycard-go/hexutils"

"github.com/ten-protocol/go-ten/tools/walletextension/cache"

"github.com/ten-protocol/go-ten/go/obsclient"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common/stopcontrol"
"github.com/ten-protocol/go-ten/go/common/viewingkey"
"github.com/ten-protocol/go-ten/go/rpc"
"github.com/ten-protocol/go-ten/tools/walletextension/common"
"github.com/ten-protocol/go-ten/tools/walletextension/storage"
)
Expand All @@ -36,7 +35,6 @@ type Services struct {
FileLogger gethlog.Logger
stopControl *stopcontrol.StopControl
version string
tenClient *obsclient.ObsClient
Cache cache.Cache
// the OG maintains a connection pool of rpc connections to underlying nodes
rpcHTTPConnPool *pool.ObjectPool
Expand All @@ -45,12 +43,6 @@ type Services struct {
}

func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services {
rpcClient, err := rpc.NewNetworkClient(hostAddrHTTP)
if err != nil {
logger.Error(fmt.Errorf("could not create RPC client on %s. Cause: %w", hostAddrHTTP, err).Error())
panic(err)
}
newTenClient := obsclient.NewObsClient(rpcClient)
newFileLogger := common.NewFileLogger()
newGatewayCache, err := cache.NewCache(logger)
if err != nil {
Expand Down Expand Up @@ -85,8 +77,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
}, nil, nil, nil)

cfg := pool.NewDefaultPoolConfig()
cfg.MaxTotal = 100
cfg.MaxTotal = 50
cfg.MaxTotal = 100 // todo - what is the right number

return &Services{
HostAddrHTTP: hostAddrHTTP,
Expand All @@ -96,7 +87,6 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.Storage
FileLogger: newFileLogger,
stopControl: stopControl,
version: version,
tenClient: newTenClient,
Cache: newGatewayCache,
rpcHTTPConnPool: pool.NewObjectPool(context.Background(), factoryHTTP, cfg),
rpcWSConnPool: pool.NewObjectPool(context.Background(), factoryWS, cfg),
Expand Down Expand Up @@ -223,7 +213,11 @@ func (w *Services) Version() string {
}

func (w *Services) GetTenNodeHealthStatus() (bool, error) {
return w.tenClient.Health()
res, err := withPlainRPCConnection[bool](w, func(client *gethrpc.Client) (*bool, error) {
res, err := obsclient.NewObsClient(client).Health()
return &res, err
})
return *res, err
}

func (w *Services) GenerateUserMessageToSign(encryptionToken []byte, formatsSlice []string) (string, error) {
Expand Down

0 comments on commit 6606773

Please sign in to comment.