Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ten gateway use http instead of ws #1743

Merged
merged 12 commits into from
Jan 17, 2024
83 changes: 81 additions & 2 deletions integration/obscurogateway/tengateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ func TestTenGateway(t *testing.T) {

// run the tests against the exis
for name, test := range map[string]func(*testing.T, string, string, wallet.Wallet){
//"testAreTxsMinted": testAreTxsMinted, this breaks the other tests bc, enable once concurency issues are fixed
//"testAreTxsMinted": testAreTxsMinted, this breaks the other tests bc, enable once concurrency issues are fixed
"testErrorHandling": testErrorHandling,
"testMultipleAccountsSubscription": testMultipleAccountsSubscription,
"testErrorsRevertedArePassed": testErrorsRevertedArePassed,
"testUnsubscribe": testUnsubscribe,
"testClosingConnectionWhileSubscribed": testClosingConnectionWhileSubscribed,
"testSubscriptionTopics": testSubscriptionTopics,
otherview marked this conversation as resolved.
Show resolved Hide resolved
} {
t.Run(name, func(t *testing.T) {
test(t, httpURL, wsURL, w)
Expand Down Expand Up @@ -244,6 +245,84 @@ func testMultipleAccountsSubscription(t *testing.T, httpURL, wsURL string, w wal
assert.Equal(t, 3, len(user2logs))
}

func testSubscriptionTopics(t *testing.T, httpURL, wsURL string, w wallet.Wallet) {
user0, err := NewUser([]wallet.Wallet{w}, httpURL, wsURL)
require.NoError(t, err)
fmt.Printf("Created user with encryption token: %s\n", user0.tgClient.UserID())

user1, err := NewUser([]wallet.Wallet{datagenerator.RandomWallet(integration.TenChainID), datagenerator.RandomWallet(integration.TenChainID)}, httpURL, wsURL)
require.NoError(t, err)
fmt.Printf("Created user with encryption token: %s\n", user0.tgClient.UserID())

// register all the accounts for that user
err = user0.RegisterAccounts()
require.NoError(t, err)
err = user1.RegisterAccounts()
require.NoError(t, err)

var amountToTransfer int64 = 1_000_000_000_000_000_000
// Transfer some funds to user1 to be able to make transactions
_, err = transferETHToAddress(user0.HTTPClient, user0.Wallets[0], user1.Wallets[0].Address(), amountToTransfer)
require.NoError(t, err)
_, err = transferETHToAddress(user0.HTTPClient, user0.Wallets[0], user1.Wallets[1].Address(), amountToTransfer)
require.NoError(t, err)

// Print balances of all registered accounts to check if all accounts have funds
err = user0.PrintUserAccountsBalances()
require.NoError(t, err)
err = user1.PrintUserAccountsBalances()
require.NoError(t, err)

// deploy events contract
deployTx := &types.LegacyTx{
Nonce: w.GetNonceAndIncrement(),
Gas: uint64(1_000_000),
GasPrice: gethcommon.Big1,
Data: gethcommon.FromHex(eventsContractBytecode),
}

signedTx, err := w.SignTransaction(deployTx)
require.NoError(t, err)

err = user0.HTTPClient.SendTransaction(context.Background(), signedTx)
require.NoError(t, err)

contractReceipt, err := integrationCommon.AwaitReceiptEth(context.Background(), user0.HTTPClient, signedTx.Hash(), time.Minute)
require.NoError(t, err)

// user0 subscribes to all events from that smart contract, user1 only an event with a topic of his first account
var user0logs []types.Log
var user1logs []types.Log
var topics [][]gethcommon.Hash
t1 := gethcommon.BytesToHash(user1.Wallets[1].Address().Bytes())
topics = append(topics, nil)
topics = append(topics, []gethcommon.Hash{t1})
subscribeToEvents([]gethcommon.Address{contractReceipt.ContractAddress}, nil, user0.WSClient, &user0logs)
subscribeToEvents([]gethcommon.Address{contractReceipt.ContractAddress}, topics, user1.WSClient, &user1logs)

// user0 calls setMessage on deployed smart contract with the account twice and expects two events
_, err = integrationCommon.InteractWithSmartContract(user0.HTTPClient, user0.Wallets[0], eventsContractABI, "setMessage", "user0Event1", contractReceipt.ContractAddress)
require.NoError(t, err)
_, err = integrationCommon.InteractWithSmartContract(user0.HTTPClient, user0.Wallets[0], eventsContractABI, "setMessage", "user0Event2", contractReceipt.ContractAddress)
require.NoError(t, err)

// user1 calls setMessage on deployed smart contract with two different accounts and expects only one event,
// because only the first address is in the topic filter of the subscription
_, err = integrationCommon.InteractWithSmartContract(user1.HTTPClient, user1.Wallets[0], eventsContractABI, "setMessage", "user1Event1", contractReceipt.ContractAddress)
require.NoError(t, err)
_, err = integrationCommon.InteractWithSmartContract(user1.HTTPClient, user1.Wallets[1], eventsContractABI, "setMessage", "user1Event2", contractReceipt.ContractAddress)
require.NoError(t, err)

// wait a few seconds to be completely sure all events arrived
time.Sleep(time.Second * 3)

// Assert the number of logs received by each client
// user0 should see two lifecycle events (1 for each interaction with the smart contract)
assert.Equal(t, 2, len(user0logs))
// user1 should see only one event (the other is filtered out because of the topic filter)
assert.Equal(t, 1, len(user1logs))
}

func testAreTxsMinted(t *testing.T, httpURL, wsURL string, w wallet.Wallet) { //nolint: unused
// set up the tgClient
ogClient := lib.NewTenGatewayLibrary(httpURL, wsURL)
Expand Down Expand Up @@ -588,7 +667,7 @@ func transferETHToAddress(client *ethclient.Client, wallet wallet.Wallet, toAddr
return integrationCommon.AwaitReceiptEth(context.Background(), client, signedTx.Hash(), 20*time.Second)
}

func subscribeToEvents(addresses []gethcommon.Address, topics [][]gethcommon.Hash, client *ethclient.Client, logs *[]types.Log) ethereum.Subscription { //nolint:unparam
func subscribeToEvents(addresses []gethcommon.Address, topics [][]gethcommon.Hash, client *ethclient.Client, logs *[]types.Log) ethereum.Subscription {
// Make a subscription
filterQuery := ethereum.FilterQuery{
Addresses: addresses,
Expand Down
117 changes: 78 additions & 39 deletions tools/walletextension/accountmanager/account_manager.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package accountmanager

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"

"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ten-protocol/go-ten/go/common"

"github.com/ten-protocol/go-ten/tools/walletextension/storage"

"github.com/ten-protocol/go-ten/tools/walletextension/subscriptions"

"github.com/ten-protocol/go-ten/go/common/gethencoding"

gethlog "github.com/ethereum/go-ethereum/log"

"github.com/ten-protocol/go-ten/go/common"

wecommon "github.com/ten-protocol/go-ten/tools/walletextension/common"

"github.com/ten-protocol/go-ten/go/rpc"
Expand All @@ -31,22 +34,27 @@
ErrNoViewingKey = "method %s cannot be called with an unauthorised client - no signed viewing keys found"
)

// AccountManager provides a single location for code that helps wallet extension in determining the appropriate
// account to use to send a request when multiple are registered
// AccountManager provides a single location for code that helps the gateway in determining the appropriate
// account to use to send a request for selected user when multiple accounts are registered
type AccountManager struct {
unauthedClient rpc.Client
// todo (@ziga) - create two types of clients - WS clients, and HTTP clients - to not create WS clients unnecessarily.
userID string
unauthedClient rpc.Client
accountsMutex sync.RWMutex
accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account
accountClientsHTTP map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC http client per registered account
hostRPCBindAddrWS string
subscriptionsManager *subscriptions.SubscriptionManager
storage storage.Storage
logger gethlog.Logger
}

func NewAccountManager(unauthedClient rpc.Client, logger gethlog.Logger) *AccountManager {
func NewAccountManager(userID string, unauthedClient rpc.Client, hostRPCBindAddressWS string, storage storage.Storage, logger gethlog.Logger) *AccountManager {
return &AccountManager{
userID: userID,
unauthedClient: unauthedClient,
accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient),
accountClientsHTTP: make(map[gethcommon.Address]*rpc.EncRPCClient),
hostRPCBindAddrWS: hostRPCBindAddressWS,
subscriptionsManager: subscriptions.New(logger),
storage: storage,
logger: logger,
}
}
Expand All @@ -56,8 +64,8 @@
m.accountsMutex.RLock()
defer m.accountsMutex.RUnlock()

addresses := make([]string, 0, len(m.accountClients))
for address := range m.accountClients {
addresses := make([]string, 0, len(m.accountClientsHTTP))
for address := range m.accountClientsHTTP {
addresses = append(addresses, address.Hex())
}
return addresses
Expand All @@ -67,10 +75,10 @@
func (m *AccountManager) AddClient(address gethcommon.Address, client *rpc.EncRPCClient) {
m.accountsMutex.Lock()
defer m.accountsMutex.Unlock()
m.accountClients[address] = client
m.accountClientsHTTP[address] = client
}

// ProxyRequest tries to identify the correct EncRPCClient to proxy the request to the Obscuro node, or it will attempt
// ProxyRequest tries to identify the correct EncRPCClient to proxy the request to the Ten node, or it will attempt
// the request with all clients until it succeeds
func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error {
// We need to handle a special case for subscribing and unsubscribing from events,
Expand Down Expand Up @@ -103,29 +111,51 @@

const emptyFilterCriteria = "[]" // This is the value that gets passed for an empty filter criteria.

// determine the client based on the topics
// if none is found use all clients from current user
// suggestSubscriptionClient returns clients that should be used for the subscription request.
// For other requests we use http clients, but for subscriptions ws clients are required, that is the reason for
// creating ws clients here.
// We only want to have the connections open for the duration of the subscription, so we create the clients here and
// don't store them in the accountClients map.
func (m *AccountManager) suggestSubscriptionClient(rpcReq *wecommon.RPCRequest) ([]rpc.Client, error) {
m.accountsMutex.RLock()
defer m.accountsMutex.RUnlock()

clients := make([]rpc.Client, 0, len(m.accountClients))
userIDBytes, err := wecommon.GetUserIDbyte(m.userID)
if err != nil {
return nil, fmt.Errorf("error decoding string (%s), %w", m.userID, err)
}

accounts, err := m.storage.GetAccounts(userIDBytes)
if err != nil {
return nil, fmt.Errorf("error getting accounts for user: %s, %w", m.userID, err)
}

// by default, if no client is identified as a candidate, then subscribe to all accounts
for _, c := range m.accountClients {
clients = append(clients, c)
userPrivateKey, err := m.storage.GetUserPrivateKey(userIDBytes)
if err != nil {
return nil, fmt.Errorf("error getting private key for user: %s, %w", m.userID, err)
}

if len(rpcReq.Params) < 2 {
return clients, nil
if len(rpcReq.Params) > 1 {
filteredAccounts, err := m.filterAccounts(rpcReq, accounts, userPrivateKey)
if err != nil {
return nil, err
}
// return filtered clients if we found any
if len(filteredAccounts) > 0 {
return m.createClientsForAccounts(filteredAccounts, userPrivateKey)
zkokelj marked this conversation as resolved.
Show resolved Hide resolved
}
}
// create clients for all accounts if we didn't find any clients that match the filter or if no topics were provided
return m.createClientsForAccounts(accounts, userPrivateKey)
}

// The filter is the second parameter
// filterClients checks if any of the accounts match the filter criteria and returns those accounts
func (m *AccountManager) filterAccounts(rpcReq *wecommon.RPCRequest, accounts []wecommon.AccountDB, userPrivateKey []byte) ([]wecommon.AccountDB, error) {

Check warning on line 153 in tools/walletextension/accountmanager/account_manager.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'userPrivateKey' seems to be unused, consider removing or renaming it as _ (revive)
var filteredAccounts []wecommon.AccountDB
filterCriteriaJSON, err := json.Marshal(rpcReq.Params[1])
if err != nil {
return nil, fmt.Errorf("could not marshal filter criteria to JSON. Cause: %w", err)
}

filterCriteria := filters.FilterCriteria{}
if string(filterCriteriaJSON) != emptyFilterCriteria {
err = filterCriteria.UnmarshalJSON(filterCriteriaJSON)
Expand All @@ -134,45 +164,54 @@
}
}

// Go through each topic filter and look for registered addresses
for i, topicCondition := range filterCriteria.Topics {
// the first topic is always the signature of the event, so it can't be an address
if i == 0 {
continue
}
for _, topicCondition := range filterCriteria.Topics {
for _, topic := range topicCondition {
potentialAddr := common.ExtractPotentialAddress(topic)
m.logger.Info(fmt.Sprintf("Potential address (%s) found for the request %s", potentialAddr, rpcReq))
if potentialAddr != nil {
cl, found := m.accountClients[*potentialAddr]
if found {
m.logger.Info("Client found for potential address: ", potentialAddr)
return []rpc.Client{cl}, nil
for _, account := range accounts {
// if we find a match, we append the account to the list of filtered accounts
if bytes.Equal(account.AccountAddress, potentialAddr.Bytes()) {
filteredAccounts = append(filteredAccounts, account)
}
}
m.logger.Info("Potential address does not have a client", potentialAddr)
}
}
}

return filteredAccounts, nil
}

// createClientsForAllAccounts creates ws clients for all accounts for given user and returns them
func (m *AccountManager) createClientsForAccounts(accounts []wecommon.AccountDB, userPrivateKey []byte) ([]rpc.Client, error) {
clients := make([]rpc.Client, 0, len(accounts))
for _, account := range accounts {
encClient, err := wecommon.CreateEncClient(m.hostRPCBindAddrWS, account.AccountAddress, userPrivateKey, account.Signature, m.logger)
if err != nil {
m.logger.Error(fmt.Errorf("error creating new client, %w", err).Error())
continue
}
clients = append(clients, encClient)
}
return clients, nil
}

func (m *AccountManager) executeCall(rpcReq *wecommon.RPCRequest, rpcResp *interface{}) error {
m.accountsMutex.RLock()
defer m.accountsMutex.RUnlock()
// for obscuro RPC requests it is important we know the sender account for the viewing key encryption/decryption
suggestedClient := m.suggestAccountClient(rpcReq, m.accountClients)
// for Ten RPC requests, it is important we know the sender account for the viewing key encryption/decryption
suggestedClient := m.suggestAccountClient(rpcReq, m.accountClientsHTTP)

switch {
case suggestedClient != nil: // use the suggested client if there is one
// todo (@ziga) - if we have a suggested client, should we still loop through the other clients if it fails?
// The call data guessing won't often be wrong but there could be edge-cases there
return submitCall(suggestedClient, rpcReq, rpcResp)

case len(m.accountClients) > 0: // try registered clients until there's a successful execution
m.logger.Info(fmt.Sprintf("appropriate client not found, attempting request with up to %d clients", len(m.accountClients)))
case len(m.accountClientsHTTP) > 0: // try registered clients until there's a successful execution
m.logger.Info(fmt.Sprintf("appropriate client not found, attempting request with up to %d clients", len(m.accountClientsHTTP)))
var err error
for _, client := range m.accountClients {
for _, client := range m.accountClientsHTTP {
err = submitCall(client, rpcReq, rpcResp)
if err == nil || errors.Is(err, rpc.ErrNilResponse) {
// request didn't fail, we don't need to continue trying the other clients
Expand Down
1 change: 1 addition & 0 deletions tools/walletextension/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
PathHealth = "/health/"
PathNetworkHealth = "/network-health/"
WSProtocol = "ws://"
HTTPProtocol = "http://"
DefaultUser = "defaultUser"
UserQueryParameter = "u"
EncryptedTokenQueryParameter = "token"
Expand Down
13 changes: 7 additions & 6 deletions tools/walletextension/container/walletextension_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type WalletExtensionContainer struct {

func NewWalletExtensionContainerFromConfig(config config.Config, logger gethlog.Logger) *WalletExtensionContainer {
// create the account manager with a single unauthenticated connection
hostRPCBindAddr := wecommon.WSProtocol + config.NodeRPCWebsocketAddress
unAuthedClient, err := rpc.NewNetworkClient(hostRPCBindAddr)
hostRPCBindAddrWS := wecommon.WSProtocol + config.NodeRPCWebsocketAddress
hostRPCBindAddrHTTP := wecommon.HTTPProtocol + config.NodeRPCHTTPAddress
unAuthedClient, err := rpc.NewNetworkClient(hostRPCBindAddrHTTP)
if err != nil {
logger.Crit("unable to create temporary client for request ", log.ErrKey, err)
os.Exit(1)
Expand All @@ -51,7 +52,7 @@ func NewWalletExtensionContainerFromConfig(config config.Config, logger gethlog.
logger.Crit("unable to create database to store viewing keys ", log.ErrKey, err)
os.Exit(1)
}
userAccountManager := useraccountmanager.NewUserAccountManager(unAuthedClient, logger, databaseStorage, hostRPCBindAddr)
userAccountManager := useraccountmanager.NewUserAccountManager(unAuthedClient, logger, databaseStorage, hostRPCBindAddrHTTP, hostRPCBindAddrWS)

// add default user (when no UserID is provided in the query parameter - for WE endpoints)
defaultUserAccountManager := userAccountManager.AddAndReturnAccountManager(hex.EncodeToString([]byte(wecommon.DefaultUser)))
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewWalletExtensionContainerFromConfig(config config.Config, logger gethlog.
os.Exit(1)
}
for _, account := range accounts {
encClient, err := wecommon.CreateEncClient(hostRPCBindAddr, account.AccountAddress, user.PrivateKey, account.Signature, logger)
encClient, err := wecommon.CreateEncClient(hostRPCBindAddrWS, account.AccountAddress, user.PrivateKey, account.Signature, logger)
if err != nil {
logger.Error(fmt.Errorf("error creating new client, %w", err).Error())
os.Exit(1)
Expand All @@ -109,14 +110,14 @@ func NewWalletExtensionContainerFromConfig(config config.Config, logger gethlog.
}

stopControl := stopcontrol.New()
walletExt := walletextension.New(hostRPCBindAddr, &userAccountManager, databaseStorage, stopControl, version, logger, &config)
walletExt := walletextension.New(hostRPCBindAddrHTTP, hostRPCBindAddrWS, &userAccountManager, databaseStorage, stopControl, version, logger, &config)
httpRoutes := api.NewHTTPRoutes(walletExt)
httpServer := api.NewHTTPServer(fmt.Sprintf("%s:%d", config.WalletExtensionHost, config.WalletExtensionPortHTTP), httpRoutes)

wsRoutes := api.NewWSRoutes(walletExt)
wsServer := api.NewWSServer(fmt.Sprintf("%s:%d", config.WalletExtensionHost, config.WalletExtensionPortWS), wsRoutes)
return NewWalletExtensionContainer(
hostRPCBindAddr,
hostRPCBindAddrWS,
walletExt,
&userAccountManager,
databaseStorage,
Expand Down
Loading
Loading