From ce7d5702039693dd923adc10a415bbd2f15d1d5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Kokelj?= Date: Thu, 19 Oct 2023 17:24:19 +0200 Subject: [PATCH] prepare for multiple subscriptions and add test --- .../obscurogateway/events_contract.sol | 31 ++ integration/obscurogateway/gateway_user.go | 121 +++++ .../obscurogateway/obscurogateway_test.go | 423 +++++++++++++++++- .../accountmanager/account_manager.go | 130 +----- tools/walletextension/api/utils.go | 5 +- tools/walletextension/common/common.go | 40 +- tools/walletextension/common/constants.go | 1 + .../subscriptions/subscriptions.go | 120 +++++ tools/walletextension/wallet_extension.go | 5 +- 9 files changed, 755 insertions(+), 121 deletions(-) create mode 100644 integration/obscurogateway/events_contract.sol create mode 100644 integration/obscurogateway/gateway_user.go create mode 100644 tools/walletextension/subscriptions/subscriptions.go diff --git a/integration/obscurogateway/events_contract.sol b/integration/obscurogateway/events_contract.sol new file mode 100644 index 0000000000..5ee90be5c2 --- /dev/null +++ b/integration/obscurogateway/events_contract.sol @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: MIT +// Specify the Solidity version +pragma solidity ^0.8.0; + +contract SimpleMessageContract { + + // State variable to store the message + string public message; + string public message2; + + // Event declaration + event MessageUpdatedWithAddress(string newMessage, address indexed sender); + event Message2Updated(string newMessage); + + // Constructor to initialize the message + constructor() { + message = "foo"; + message2 = "foo"; + } + + // Function to set a new message + function setMessage(string memory newMessage) public { + message = newMessage; + emit MessageUpdatedWithAddress(newMessage, msg.sender); // Emit the event (only sender can see it) + } + + function setMessage2(string memory newMessage) public { + message2 = newMessage; + emit Message2Updated(newMessage); // Emit the event (everyone can see it) + } +} \ No newline at end of file diff --git a/integration/obscurogateway/gateway_user.go b/integration/obscurogateway/gateway_user.go new file mode 100644 index 0000000000..660b497e18 --- /dev/null +++ b/integration/obscurogateway/gateway_user.go @@ -0,0 +1,121 @@ +package faucet + +import ( + "context" + "crypto/ecdsa" + "encoding/hex" + "fmt" + "io" + "net/http" + "strings" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/obscuronet/go-obscuro/go/wallet" + "github.com/valyala/fasthttp" +) + +type GatewayUser struct { + UserID string + Wallets []wallet.Wallet + HTTPClient *ethclient.Client + WSClient *ethclient.Client + ServerAddressHTTP string + ServerAddressWS string +} + +func NewUser(wallets []wallet.Wallet, serverAddressHTTP string, serverAddressWS string) (*GatewayUser, error) { + // automatically join OG + userID, err := joinObscuroGateway(serverAddressHTTP) + if err != nil { + return nil, err + } + + // create clients + httpClient, err := ethclient.Dial(serverAddressHTTP + "/v1/" + "?u=" + userID) + if err != nil { + return nil, err + } + wsClient, err := ethclient.Dial(serverAddressWS + "/v1/" + "?u=" + userID) + if err != nil { + return nil, err + } + + return &GatewayUser{ + UserID: userID, + Wallets: wallets, + HTTPClient: httpClient, + WSClient: wsClient, + ServerAddressHTTP: serverAddressHTTP, + ServerAddressWS: serverAddressWS, + }, nil +} + +func (u GatewayUser) RegisterAccounts() error { + for _, w := range u.Wallets { + response, err := registerAccount(u.ServerAddressHTTP, u.UserID, w.PrivateKey(), w.Address().Hex()) + if err != nil { + return err + } + fmt.Printf("Successfully registered address %s for user: %s with response: %s \n", w.Address().Hex(), u.UserID, response) + } + + return nil +} + +func (u GatewayUser) PrintUserAccountsBalances() error { + for _, w := range u.Wallets { + balance, err := u.HTTPClient.BalanceAt(context.Background(), w.Address(), nil) + if err != nil { + return err + } + fmt.Println("Balance for account ", w.Address().Hex(), " - ", balance.String()) + } + return nil +} + +func registerAccount(url string, userID string, pk *ecdsa.PrivateKey, hexAddress string) ([]byte, error) { + payload := prepareRegisterPayload(userID, pk, hexAddress) + + req, err := http.NewRequestWithContext( + context.Background(), + http.MethodPost, + url+"/v1/authenticate/?u="+userID, + strings.NewReader(payload), + ) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + + client := &http.Client{} + response, err := client.Do(req) + if err != nil { + return nil, err + } + + defer response.Body.Close() + return io.ReadAll(response.Body) +} + +func prepareRegisterPayload(userID string, pk *ecdsa.PrivateKey, hexAddress string) string { + message := fmt.Sprintf("Register %s for %s", userID, strings.ToLower(hexAddress)) + prefixedMessage := fmt.Sprintf("\u0019Ethereum Signed Message:\n%d%s", len(message), message) + messageHash := crypto.Keccak256([]byte(prefixedMessage)) + sig, err := crypto.Sign(messageHash, pk) + if err != nil { + fmt.Printf("Failed to sign message: %v\n", err) + } + sig[64] += 27 + signature := "0x" + hex.EncodeToString(sig) + payload := fmt.Sprintf("{\"signature\": \"%s\", \"message\": \"%s\"}", signature, message) + return payload +} + +func joinObscuroGateway(url string) (string, error) { + statusCode, userID, err := fasthttp.Get(nil, fmt.Sprintf("%s/v1/join/", url)) + if err != nil || statusCode != 200 { + return "", fmt.Errorf(fmt.Sprintf("Failed to get userID. Status code: %d, err: %s", statusCode, err)) + } + return string(userID), nil +} diff --git a/integration/obscurogateway/obscurogateway_test.go b/integration/obscurogateway/obscurogateway_test.go index f6cd9e3862..3a8e4ecfe1 100644 --- a/integration/obscurogateway/obscurogateway_test.go +++ b/integration/obscurogateway/obscurogateway_test.go @@ -3,6 +3,12 @@ package faucet import ( "context" "fmt" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + "github.com/obscuronet/go-obscuro/go/common/retry" "math/big" "net/http" "strings" @@ -105,6 +111,254 @@ func TestObscuroGateway(t *testing.T) { assert.NoError(t, err) } +func TestObscuroGatewaySubscriptionsWithMultipleAccounts(t *testing.T) { + // t.Skip("Commented it out until more testing is driven from this test") + startPort := integration.StartPortObscuroGatewayUnitTest + wallets := createObscuroNetwork(t, startPort) + + obscuroGatewayConf := config.Config{ + WalletExtensionHost: "127.0.0.1", + WalletExtensionPortHTTP: startPort + integration.DefaultObscuroGatewayHTTPPortOffset, + WalletExtensionPortWS: startPort + integration.DefaultObscuroGatewayWSPortOffset, + NodeRPCHTTPAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCHTTPOffset), + NodeRPCWebsocketAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCWSOffset), + LogPath: "sys_out", + VerboseFlag: false, + DBType: "sqlite", + } + + obscuroGwContainer := container.NewWalletExtensionContainerFromConfig(obscuroGatewayConf, testlog.Logger()) + go func() { + err := obscuroGwContainer.Start() + if err != nil { + fmt.Printf("error stopping WE - %s", err) + } + }() + + // wait for the msg bus contract to be deployed + time.Sleep(5 * time.Second) + + // make sure the server is ready to receive requests + gatewayAddressHTTP := fmt.Sprintf("http://%s:%d", obscuroGatewayConf.WalletExtensionHost, obscuroGatewayConf.WalletExtensionPortHTTP) + gatewayAddressWS := fmt.Sprintf("ws://%s:%d", obscuroGatewayConf.WalletExtensionHost, obscuroGatewayConf.WalletExtensionPortWS) + fmt.Println("gatewayAddressHTTP: ", gatewayAddressHTTP) + fmt.Println("gatewayAddressWS: ", gatewayAddressWS) + + // make sure the server is ready to receive requests + err := waitServerIsReady(gatewayAddressHTTP) + require.NoError(t, err) + + // Server is now ready and we can create requests + + // Create users + user0, err := NewUser([]wallet.Wallet{wallets.L2FaucetWallet}, gatewayAddressHTTP, gatewayAddressWS) + require.NoError(t, err) + fmt.Printf("Created user with UserID: %s\n", user0.UserID) + + user1, err := NewUser(wallets.SimObsWallets[0:2], gatewayAddressHTTP, gatewayAddressWS) + require.NoError(t, err) + fmt.Printf("Created user with UserID: %s\n", user1.UserID) + + user2, err := NewUser(wallets.SimObsWallets[2:4], gatewayAddressHTTP, gatewayAddressWS) + require.NoError(t, err) + fmt.Printf("Created user with UserID: %s\n", user2.UserID) + + // register all the accounts for that user + err = user0.RegisterAccounts() + require.NoError(t, err) + err = user1.RegisterAccounts() + require.NoError(t, err) + err = user2.RegisterAccounts() + require.NoError(t, err) + + // Transfer some funds to user1 and user2 wallets, because they need it to make transactions + var amountToTransfer int64 = 1_000_000_000_000_000_000 + // Transfer some funds to user1 and user2 wallets, because they need it to make transactions + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user1.Wallets[0].Address(), amountToTransfer) + require.NoError(t, err) + time.Sleep(5 * time.Second) + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user1.Wallets[1].Address(), amountToTransfer) + require.NoError(t, err) + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user2.Wallets[0].Address(), amountToTransfer) + require.NoError(t, err) + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user2.Wallets[1].Address(), amountToTransfer) + require.NoError(t, err) + + // Print balances of all registered accounts to check if all accounts have funds + err = user0.PrintUserAccountsBalances() + require.NoError(t, err) + err = user1.PrintUserAccountsBalances() + require.NoError(t, err) + err = user2.PrintUserAccountsBalances() + require.NoError(t, err) + + // User0 deploys a contract that will later emit events + bytecode := `60806040523480156200001157600080fd5b506040518060400160405280600381526020017f666f6f00000000000000000000000000000000000000000000000000000000008152506000908162000058919062000320565b506040518060400160405280600381526020017f666f6f0000000000000000000000000000000000000000000000000000000000815250600190816200009f919062000320565b5062000407565b600081519050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052604160045260246000fd5b7f4e487b7100000000000000000000000000000000000000000000000000000000600052602260045260246000fd5b600060028204905060018216806200012857607f821691505b6020821081036200013e576200013d620000e0565b5b50919050565b60008190508160005260206000209050919050565b60006020601f8301049050919050565b600082821b905092915050565b600060088302620001a87fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8262000169565b620001b4868362000169565b95508019841693508086168417925050509392505050565b6000819050919050565b6000819050919050565b600062000201620001fb620001f584620001cc565b620001d6565b620001cc565b9050919050565b6000819050919050565b6200021d83620001e0565b620002356200022c8262000208565b84845462000176565b825550505050565b600090565b6200024c6200023d565b6200025981848462000212565b505050565b5b8181101562000281576200027560008262000242565b6001810190506200025f565b5050565b601f821115620002d0576200029a8162000144565b620002a58462000159565b81016020851015620002b5578190505b620002cd620002c48562000159565b8301826200025e565b50505b505050565b600082821c905092915050565b6000620002f560001984600802620002d5565b1980831691505092915050565b6000620003108383620002e2565b9150826002028217905092915050565b6200032b82620000a6565b67ffffffffffffffff811115620003475762000346620000b1565b5b6200035382546200010f565b6200036082828562000285565b600060209050601f83116001811462000398576000841562000383578287015190505b6200038f858262000302565b865550620003ff565b601f198416620003a88662000144565b60005b82811015620003d257848901518255600182019150602085019450602081019050620003ab565b86831015620003f25784890151620003ee601f891682620002e2565b8355505b6001600288020188555050505b505050505050565b6107ee80620004176000396000f3fe608060405234801561001057600080fd5b506004361061004c5760003560e01c8063368b877214610051578063c2d366581461006d578063c5ced0361461008b578063e21f37ce146100a7575b600080fd5b61006b600480360381019061006691906103e6565b6100c5565b005b610075610126565b60405161008291906104ae565b60405180910390f35b6100a560048036038101906100a091906103e6565b6101b4565b005b6100af6101fe565b6040516100bc91906104ae565b60405180910390f35b80600090816100d491906106e6565b503373ffffffffffffffffffffffffffffffffffffffff167fe31c2ad953ded70272b94617f9181f8cc33755f1b40f4c706660f6ee0dfb634a8260405161011b91906104ae565b60405180910390a250565b60018054610133906104ff565b80601f016020809104026020016040519081016040528092919081815260200182805461015f906104ff565b80156101ac5780601f10610181576101008083540402835291602001916101ac565b820191906000526020600020905b81548152906001019060200180831161018f57829003601f168201915b505050505081565b80600190816101c391906106e6565b507f4fcdf2659dcf2254d2bce07af2baaf0c6ddf6da052dd241b2445a2f6398ae575816040516101f391906104ae565b60405180910390a150565b6000805461020b906104ff565b80601f0160208091040260200160405190810160405280929190818152602001828054610237906104ff565b80156102845780601f1061025957610100808354040283529160200191610284565b820191906000526020600020905b81548152906001019060200180831161026757829003601f168201915b505050505081565b6000604051905090565b600080fd5b600080fd5b600080fd5b600080fd5b6000601f19601f8301169050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052604160045260246000fd5b6102f3826102aa565b810181811067ffffffffffffffff82111715610312576103116102bb565b5b80604052505050565b600061032561028c565b905061033182826102ea565b919050565b600067ffffffffffffffff821115610351576103506102bb565b5b61035a826102aa565b9050602081019050919050565b82818337600083830152505050565b600061038961038484610336565b61031b565b9050828152602081018484840111156103a5576103a46102a5565b5b6103b0848285610367565b509392505050565b600082601f8301126103cd576103cc6102a0565b5b81356103dd848260208601610376565b91505092915050565b6000602082840312156103fc576103fb610296565b5b600082013567ffffffffffffffff81111561041a5761041961029b565b5b610426848285016103b8565b91505092915050565b600081519050919050565b600082825260208201905092915050565b60005b8381101561046957808201518184015260208101905061044e565b60008484015250505050565b60006104808261042f565b61048a818561043a565b935061049a81856020860161044b565b6104a3816102aa565b840191505092915050565b600060208201905081810360008301526104c88184610475565b905092915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052602260045260246000fd5b6000600282049050600182168061051757607f821691505b60208210810361052a576105296104d0565b5b50919050565b60008190508160005260206000209050919050565b60006020601f8301049050919050565b600082821b905092915050565b6000600883026105927fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff82610555565b61059c8683610555565b95508019841693508086168417925050509392505050565b6000819050919050565b6000819050919050565b60006105e36105de6105d9846105b4565b6105be565b6105b4565b9050919050565b6000819050919050565b6105fd836105c8565b610611610609826105ea565b848454610562565b825550505050565b600090565b610626610619565b6106318184846105f4565b505050565b5b818110156106555761064a60008261061e565b600181019050610637565b5050565b601f82111561069a5761066b81610530565b61067484610545565b81016020851015610683578190505b61069761068f85610545565b830182610636565b50505b505050565b600082821c905092915050565b60006106bd6000198460080261069f565b1980831691505092915050565b60006106d683836106ac565b9150826002028217905092915050565b6106ef8261042f565b67ffffffffffffffff811115610708576107076102bb565b5b61071282546104ff565b61071d828285610659565b600060209050601f831160018114610750576000841561073e578287015190505b61074885826106ca565b8655506107b0565b601f19841661075e86610530565b60005b8281101561078657848901518255600182019150602085019450602081019050610761565b868310156107a3578489015161079f601f8916826106ac565b8355505b6001600288020188555050505b50505050505056fea264697066735822122076146d8c796917af248ecb981f38094293788d92ad21704dc623fd8412cb9dc964736f6c63430008120033` + abiString := ` + [ + { + "inputs": [], + "stateMutability": "nonpayable", + "type": "constructor" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "newMessage", + "type": "string" + } + ], + "name": "Message2Updated", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "newMessage", + "type": "string" + }, + { + "indexed": true, + "internalType": "address", + "name": "sender", + "type": "address" + } + ], + "name": "MessageUpdatedWithAddress", + "type": "event" + }, + { + "inputs": [], + "name": "message", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "message2", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [ + { + "internalType": "string", + "name": "newMessage", + "type": "string" + } + ], + "name": "setMessage", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [ + { + "internalType": "string", + "name": "newMessage", + "type": "string" + } + ], + "name": "setMessage2", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + } +] + ` + + _, contractAddress, err := DeploySmartContract(user0.HTTPClient, user0.Wallets[0], bytecode) + require.NoError(t, err) + fmt.Println("Deployed contract address: ", contractAddress) + + // contract abi + contractAbi, err := abi.JSON(strings.NewReader(abiString)) + require.NoError(t, err) + + // check if contract was deployed and call one of the implicit getter functions + // call getter for a message + resultMessage, err := getStringValueFromSmartContractGetter(contractAddress, contractAbi, "message", user1.HTTPClient) + require.NoError(t, err) + + // check if the value is the same as hardcoded in smart contract + hardcodedMessageValue := "foo" + assert.Equal(t, hardcodedMessageValue, resultMessage) + + // subscribe with all three users for all events in deployed contract + var user0logs []types.Log + var user1logs []types.Log + var user2logs []types.Log + subscribeToEvents([]gethcommon.Address{contractAddress}, nil, user0.WSClient, &user0logs) + subscribeToEvents([]gethcommon.Address{contractAddress}, nil, user1.WSClient, &user1logs) + subscribeToEvents([]gethcommon.Address{contractAddress}, nil, user2.WSClient, &user2logs) + + time.Sleep(time.Second) + + // user1 calls setMessage and setMessage2 on deployed smart contract with the account + // that was registered as the first in OG + user1MessageValue := "user1PrivateEvent" + // interact with smart contract and cause events to be emitted + _, err = InteractWithSmartContract(user1.HTTPClient, user1.Wallets[0], contractAbi, "setMessage", "user1PrivateEvent", contractAddress) + require.NoError(t, err) + _, err = InteractWithSmartContract(user1.HTTPClient, user1.Wallets[0], contractAbi, "setMessage2", "user1PublicEvent", contractAddress) + require.NoError(t, err) + + // check if value was changed in the smart contract with the interactions above + resultMessage, err = getStringValueFromSmartContractGetter(contractAddress, contractAbi, "message", user1.HTTPClient) + require.NoError(t, err) + assert.Equal(t, user1MessageValue, resultMessage) + + // user2 calls setMessage and setMessage2 on deployed smart contract with the account + // that was registered as the second in OG + user2MessageValue := "user2PrivateEvent" + // interact with smart contract and cause events to be emitted + _, err = InteractWithSmartContract(user2.HTTPClient, user2.Wallets[1], contractAbi, "setMessage", "user2PrivateEvent", contractAddress) + require.NoError(t, err) + _, err = InteractWithSmartContract(user2.HTTPClient, user2.Wallets[1], contractAbi, "setMessage2", "user2PublicEvent", contractAddress) + require.NoError(t, err) + + // check if value was changed in the smart contract with the interactions above + resultMessage, err = getStringValueFromSmartContractGetter(contractAddress, contractAbi, "message", user1.HTTPClient) + require.NoError(t, err) + assert.Equal(t, user2MessageValue, resultMessage) + + // wait a few seconds to be completely sure all events arrived + time.Sleep(time.Second * 3) + + // Assert the number of logs received by each client + // user0 should see two lifecycle events (1 for each interaction with setMessage2) + assert.Equal(t, 2, len(user0logs)) + // user1 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage + assert.Equal(t, 3, len(user1logs)) + // user2 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage + assert.Equal(t, 2, len(user2logs)) + + // Gracefully shutdown + err = obscuroGwContainer.Stop() + assert.NoError(t, err) +} + func transferRandomAddr(t *testing.T, client *ethclient.Client, w wallet.Wallet) common.TxHash { ctx := context.Background() toAddr := datagenerator.RandomAddress() @@ -163,7 +417,7 @@ func transferRandomAddr(t *testing.T, client *ethclient.Client, w wallet.Wallet) func createObscuroNetwork(t *testing.T, startPort int) *params.SimWallets { // Create the Obscuro network. numberOfNodes := 1 - wallets := params.NewSimWallets(1, numberOfNodes, integration.EthereumChainID, integration.ObscuroChainID) + wallets := params.NewSimWallets(5, numberOfNodes, integration.EthereumChainID, integration.ObscuroChainID) simParams := params.SimParams{ NumberOfNodes: numberOfNodes, AvgBlockDuration: 1 * time.Second, @@ -200,3 +454,170 @@ func waitServerIsReady(serverAddr string) error { } return fmt.Errorf("timed out before server was ready") } + +func ComputeContractAddress(sender gethcommon.Address, nonce uint64) (gethcommon.Address, error) { + // RLP encode the byte array of the sender's address and nonce + encoded, err := rlp.EncodeToBytes([]interface{}{sender, nonce}) + if err != nil { + return gethcommon.Address{}, err + } + // Compute the Keccak-256 hash of the RLP encoded byte array + hash := crypto.Keccak256(encoded) + + // The contract address is the last 20 bytes of this hash + return gethcommon.BytesToAddress(hash[len(hash)-20:]), nil +} + +func TransferETHToAddress(client *ethclient.Client, wallet wallet.Wallet, toAddress gethcommon.Address, amount int64) (*types.Receipt, error) { + transferTx1 := types.LegacyTx{ + Nonce: wallet.GetNonceAndIncrement(), + To: &toAddress, + Value: big.NewInt(amount), + Gas: uint64(1_000_000), + GasPrice: gethcommon.Big1, + Data: nil, + } + signedTx, err := wallet.SignTransaction(&transferTx1) + if err != nil { + return nil, err + } + err = client.SendTransaction(context.Background(), signedTx) + if err != nil { + return nil, err + } + return AwaitTransactionReceipt(context.Background(), client, signedTx.Hash(), 2*time.Second) +} + +func DeploySmartContract(client *ethclient.Client, wallet wallet.Wallet, bytecode string) (*types.Receipt, gethcommon.Address, error) { + contractNonce := wallet.GetNonceAndIncrement() + contractTx := types.LegacyTx{ + Nonce: contractNonce, + Gas: uint64(1_000_000), + GasPrice: gethcommon.Big1, + Data: gethcommon.FromHex(bytecode), + } + + signedTx, err := wallet.SignTransaction(&contractTx) + if err != nil { + return nil, gethcommon.Address{}, err + } + + err = client.SendTransaction(context.Background(), signedTx) + if err != nil { + return nil, gethcommon.Address{}, err + } + + // await for the transaction to be included in a block + txReceiptContractCreation, err := AwaitTransactionReceipt(context.Background(), client, signedTx.Hash(), 2*time.Second) + if err != nil { + return nil, gethcommon.Address{}, err + } + + // get contract address + contractAddress, err := ComputeContractAddress(wallet.Address(), contractNonce) + if err != nil { + return nil, gethcommon.Address{}, err + } + + return txReceiptContractCreation, contractAddress, nil +} + +func InteractWithSmartContract(client *ethclient.Client, wallet wallet.Wallet, contractAbi abi.ABI, methodName string, methodParam string, contractAddress gethcommon.Address) (*types.Receipt, error) { + contractInteractionData, err := contractAbi.Pack(methodName, methodParam) + if err != nil { + return nil, err + } + + interactionTx := types.LegacyTx{ + Nonce: wallet.GetNonceAndIncrement(), + To: &contractAddress, + Gas: uint64(1_000_000), + GasPrice: gethcommon.Big1, + Data: contractInteractionData, + } + signedTx, err := wallet.SignTransaction(&interactionTx) + if err != nil { + return nil, err + } + err = client.SendTransaction(context.Background(), signedTx) + if err != nil { + return nil, err + } + + txReceipt, err := AwaitTransactionReceipt(context.Background(), client, signedTx.Hash(), 2*time.Second) + if err != nil { + return nil, err + } + + return txReceipt, nil +} + +func getStringValueFromSmartContractGetter(contractAddress gethcommon.Address, contractAbi abi.ABI, method string, client *ethclient.Client) (string, error) { + contract := bind.NewBoundContract(contractAddress, contractAbi, client, client, client) + var resultMessage string + callOpts := &bind.CallOpts{} + results := make([]interface{}, 1) + results[0] = &resultMessage + err := contract.Call(callOpts, &results, method) + if err != nil { + return "", err + } + + return resultMessage, nil +} + +func AwaitTransactionReceipt(ctx context.Context, client *ethclient.Client, txHash gethcommon.Hash, timeout time.Duration) (*types.Receipt, error) { + timeoutStrategy := retry.NewTimeoutStrategy(timeout, time.Second) + return AwaitTransactionReceiptWithRetryStrategy(ctx, client, txHash, timeoutStrategy) +} + +func AwaitTransactionReceiptWithRetryStrategy(ctx context.Context, client *ethclient.Client, txHash gethcommon.Hash, retryStrategy retry.Strategy) (*types.Receipt, error) { + retryStrategy.Reset() + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled before receipt was received") + case <-time.After(retryStrategy.NextRetryInterval()): + receipt, err := client.TransactionReceipt(ctx, txHash) + if err == nil { + return receipt, nil + } + if retryStrategy.Done() { + return nil, fmt.Errorf("receipt not found - %s - %w", retryStrategy.Summary(), err) + } + } + } +} + +func subscribeToEvents(addresses []gethcommon.Address, topics [][]gethcommon.Hash, client *ethclient.Client, logs *[]types.Log) { + // Make a subscription + filterQuery := ethereum.FilterQuery{ + Addresses: addresses, + FromBlock: big.NewInt(0), // todo (@ziga) - without those we get errors - fix that and make them configurable + ToBlock: big.NewInt(10000), + Topics: topics, + } + logsCh := make(chan types.Log) + + subscription, err := client.SubscribeFilterLogs(context.Background(), filterQuery, logsCh) + if err != nil { + fmt.Printf("Failed to subscribe to filter logs: %v\n", err) + } + // todo (@ziga) - unsubscribe when it is fixed... + // defer subscription.Unsubscribe() // cleanup + + // Listen for logs in a goroutine + go func() { + for { + select { + case err := <-subscription.Err(): + fmt.Printf("Error from logs subscription: %v\n", err) + return + case log := <-logsCh: + // append logs to be visible from the main thread + *logs = append(*logs, log) + } + } + }() + +} diff --git a/tools/walletextension/accountmanager/account_manager.go b/tools/walletextension/accountmanager/account_manager.go index 235acdd1f7..a28e35de38 100644 --- a/tools/walletextension/accountmanager/account_manager.go +++ b/tools/walletextension/accountmanager/account_manager.go @@ -1,14 +1,13 @@ package accountmanager import ( - "context" "encoding/json" "errors" "fmt" "strings" - "time" "github.com/ethereum/go-ethereum/eth/filters" + "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" "github.com/obscuronet/go-obscuro/go/common/gethencoding" @@ -18,9 +17,6 @@ import ( wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" - "github.com/go-kit/kit/transport/http/jsonrpc" - - "github.com/obscuronet/go-obscuro/go/common/log" "github.com/obscuronet/go-obscuro/go/rpc" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" @@ -28,8 +24,6 @@ import ( ) const ( - methodEthSubscription = "eth_subscription" - ethCallPaddedArgLen = 64 ethCallAddrPadding = "000000000000000000000000" @@ -41,15 +35,17 @@ const ( type AccountManager struct { unauthedClient rpc.Client // todo (@ziga) - create two types of clients - WS clients, and HTTP clients - to not create WS clients unnecessarily. - accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account - logger gethlog.Logger + accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account + subscriptionsManager *subscriptions.SubscriptionManager + logger gethlog.Logger } func NewAccountManager(unauthedClient rpc.Client, logger gethlog.Logger) *AccountManager { return &AccountManager{ - unauthedClient: unauthedClient, - accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient), - logger: logger, + unauthedClient: unauthedClient, + accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient), + subscriptionsManager: subscriptions.New(logger), + logger: logger, } } @@ -60,18 +56,18 @@ func (m *AccountManager) AddClient(address gethcommon.Address, client *rpc.EncRP // ProxyRequest tries to identify the correct EncRPCClient to proxy the request to the Obscuro node, or it will attempt // the request with all clients until it succeeds -func (m *AccountManager) ProxyRequest(rpcReq *RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error { +func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error { if rpcReq.Method == rpc.Subscribe { clients, err := m.suggestSubscriptionClient(rpcReq) if err != nil { return err } - // fetch the clients from a topic - for _, client := range clients { - return m.executeSubscribe(client, rpcReq, rpcResp, userConn) + err = m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn) + if err != nil { + m.logger.Error("Error subscribing to multiple clients") } + return err } - return m.executeCall(rpcReq, rpcResp) } @@ -79,7 +75,7 @@ const emptyFilterCriteria = "[]" // This is the value that gets passed for an em // determine the client based on the topics // if none is found use all clients from current user -func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Client, error) { +func (m *AccountManager) suggestSubscriptionClient(rpcReq *wecommon.RPCRequest) ([]rpc.Client, error) { clients := make([]rpc.Client, 0, len(m.accountClients)) // by default, if no client is identified as a candidate, then subscribe to all accounts @@ -128,7 +124,7 @@ func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Cl return clients, nil } -func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) error { +func (m *AccountManager) executeCall(rpcReq *wecommon.RPCRequest, rpcResp *interface{}) error { // for obscuro RPC requests it is important we know the sender account for the viewing key encryption/decryption suggestedClient := m.suggestAccountClient(rpcReq, m.accountClients) @@ -160,7 +156,7 @@ func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) e } // suggestAccountClient works through various methods to try and guess which available client to use for a request, returns nil if none found -func (m *AccountManager) suggestAccountClient(req *RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient { +func (m *AccountManager) suggestAccountClient(req *wecommon.RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient { if len(accClients) == 1 { for _, client := range accClients { // return the first (and only) client @@ -275,66 +271,7 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map return nil, fmt.Errorf("no known account found in data bytes") } -func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit - if len(req.Params) == 0 { - return fmt.Errorf("could not subscribe as no subscription namespace was provided") - } - m.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) - ch := make(chan common.IDAndLog) - subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, ch, req.Params...) - if err != nil { - return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) - } - - // We listen for incoming messages on the subscription. - go func() { - for { - select { - case idAndLog := <-ch: - if userConn.IsClosed() { - m.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) - return - } - - jsonResponse, err := prepareLogResponse(idAndLog) - if err != nil { - m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) - continue - } - - m.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, idAndLog.SubID) - err = userConn.WriteResponse(jsonResponse) - if err != nil { - m.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) - continue - } - - case err = <-subscription.Err(): - // An error on this channel means the subscription has ended, so we exit the loop. - if userConn != nil && err != nil { - userConn.HandleError(err.Error()) - } - - return - } - } - }() - - // We periodically check if the websocket is closed, and terminate the subscription. - go func() { - for { - if userConn.IsClosed() { - subscription.Unsubscribe() - return - } - time.Sleep(100 * time.Millisecond) - } - }() - - return nil -} - -func submitCall(client *rpc.EncRPCClient, req *RPCRequest, resp *interface{}) error { +func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error { if req.Method == rpc.Call || req.Method == rpc.EstimateGas { // Never modify the original request, as it might be reused. req = req.Clone() @@ -390,36 +327,3 @@ func setFromFieldIfMissing(args []interface{}, account gethcommon.Address) ([]in return request, nil } - -// Formats the log to be sent as an Eth JSON-RPC response. -func prepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) { - paramsMap := make(map[string]interface{}) - paramsMap[wecommon.JSONKeySubscription] = idAndLog.SubID - paramsMap[wecommon.JSONKeyResult] = idAndLog.Log - - respMap := make(map[string]interface{}) - respMap[wecommon.JSONKeyRPCVersion] = jsonrpc.Version - respMap[wecommon.JSONKeyMethod] = methodEthSubscription - respMap[wecommon.JSONKeyParams] = paramsMap - - jsonResponse, err := json.Marshal(respMap) - if err != nil { - return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err) - } - return jsonResponse, nil -} - -type RPCRequest struct { - ID json.RawMessage - Method string - Params []interface{} -} - -// Clone returns a new instance of the *RPCRequest -func (r *RPCRequest) Clone() *RPCRequest { - return &RPCRequest{ - ID: r.ID, - Method: r.Method, - Params: r.Params, - } -} diff --git a/tools/walletextension/api/utils.go b/tools/walletextension/api/utils.go index 3bca5e921a..546cb4bae1 100644 --- a/tools/walletextension/api/utils.go +++ b/tools/walletextension/api/utils.go @@ -5,12 +5,11 @@ import ( "fmt" "strings" - "github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager" "github.com/obscuronet/go-obscuro/tools/walletextension/common" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" ) -func parseRequest(body []byte) (*accountmanager.RPCRequest, error) { +func parseRequest(body []byte) (*common.RPCRequest, error) { // We unmarshal the JSON request var reqJSONMap map[string]json.RawMessage err := json.Unmarshal(body, &reqJSONMap) @@ -33,7 +32,7 @@ func parseRequest(body []byte) (*accountmanager.RPCRequest, error) { return nil, fmt.Errorf("could not unmarshal params list from JSON-RPC request body: %w", err) } - return &accountmanager.RPCRequest{ + return &common.RPCRequest{ ID: reqID, Method: method, Params: params, diff --git a/tools/walletextension/common/common.go b/tools/walletextension/common/common.go index ab59aaff35..0edf1f1a20 100644 --- a/tools/walletextension/common/common.go +++ b/tools/walletextension/common/common.go @@ -3,10 +3,14 @@ package common import ( "crypto/ecdsa" "encoding/hex" + "encoding/json" "errors" "fmt" "regexp" + "github.com/go-kit/kit/transport/http/jsonrpc" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" "github.com/obscuronet/go-obscuro/go/common/viewingkey" @@ -36,7 +40,7 @@ func BytesToPrivateKey(keyBytes []byte) (*ecies.PrivateKey, error) { return eciesPrivateKey, nil } -// CalculateUserID calculates userID from public key +// CalculateUserID calculates userID from a public key func CalculateUserID(publicKeyBytes []byte) []byte { return crypto.Keccak256Hash(publicKeyBytes).Bytes() } @@ -91,3 +95,37 @@ func CreateEncClient( } return encClient, nil } + +type RPCRequest struct { + ID json.RawMessage + Method string + Params []interface{} +} + +// Clone returns a new instance of the *RPCRequest +func (r *RPCRequest) Clone() *RPCRequest { + return &RPCRequest{ + ID: r.ID, + Method: r.Method, + Params: r.Params, + } +} + +// Formats the log to be sent as an Eth JSON-RPC response. +// TODO (@ziga) - Move this code to a subscriptions package once it is used only there.. +func PrepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) { + paramsMap := make(map[string]interface{}) + paramsMap[JSONKeySubscription] = idAndLog.SubID + paramsMap[JSONKeyResult] = idAndLog.Log + + respMap := make(map[string]interface{}) + respMap[JSONKeyRPCVersion] = jsonrpc.Version + respMap[JSONKeyMethod] = methodEthSubscription + respMap[JSONKeyParams] = paramsMap + + jsonResponse, err := json.Marshal(respMap) + if err != nil { + return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err) + } + return jsonResponse, nil +} diff --git a/tools/walletextension/common/constants.go b/tools/walletextension/common/constants.go index 6c51c03202..e9e92a7603 100644 --- a/tools/walletextension/common/constants.go +++ b/tools/walletextension/common/constants.go @@ -50,6 +50,7 @@ const ( GetStorageAtUserIDRequestMethodName = "getUserID" SuccessMsg = "success" APIVersion1 = "/v1" + methodEthSubscription = "eth_subscription" PathVersion = "/version/" ) diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go new file mode 100644 index 0000000000..7b5ae01b8f --- /dev/null +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -0,0 +1,120 @@ +package subscriptions + +import ( + "context" + "fmt" + "time" + + gethlog "github.com/ethereum/go-ethereum/log" + gethrpc "github.com/ethereum/go-ethereum/rpc" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/common/log" + "github.com/obscuronet/go-obscuro/go/rpc" + wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" + "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" +) + +type SubscriptionManager struct { + subscriptionMappings map[string][]string + logger gethlog.Logger +} + +func New(logger gethlog.Logger) *SubscriptionManager { + return &SubscriptionManager{ + subscriptionMappings: make(map[string][]string), + logger: logger, + } +} + +// HandleNewSubscriptions subscribes to an event with all the clients provided. +// Doing this is necessary because we have relevancy rule, and we want to subscribe sometimes with all clients to get all the events +func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { + if len(req.Params) == 0 { + return fmt.Errorf("could not subscribe as no subscription namespace was provided") + } + + sm.logger.Info(fmt.Sprintf("Subscribing to event %s with %d clients", req.Params, len(clients))) + + // create a common channel for subscriptions from all clients + funnelMultipleAccountsChan := make(chan common.IDAndLog) + + // read from a multiple accounts channel and write results to userConn + go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, sm.logger) + + // iterate over all clients and subscribe for each of them + for _, client := range clients { + // fmt.Println("Subscribing with client: ", client) + subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...) + if err != nil { + return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) + } + + // Add map subscriptionIDs + if currentNodeSubscriptionID, ok := (*resp).(string); ok { + // TODO (@ziga): Currently we use the same value for node and user subscriptionID - this will change after + // subscribing with multiple accounts + sm.UpdateSubscriptionMapping(currentNodeSubscriptionID, currentNodeSubscriptionID) + } + + // We periodically check if the websocket is closed, and terminate the subscription. + go checkIfUserConnIsClosedAndUnsubscribe(userConn, subscription) + + return nil + // TODO (@ziga) + // At this stage we want to use only the first account - same as before + // introduce subscribing with all accounts in another PR ) + } + return nil +} + +func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, logger gethlog.Logger) { + for { + select { + case data := <-channel: + jsonResponse, err := wecommon.PrepareLogResponse(data) + if err != nil { + logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err) + continue + } + + logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID) + err = userConn.WriteResponse(jsonResponse) + if err != nil { + logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, data.SubID, log.ErrKey, err) + continue + } + } + } +} + +func checkIfUserConnIsClosedAndUnsubscribe(userConn userconn.UserConn, subscription *gethrpc.ClientSubscription) { + for { + if userConn.IsClosed() { + subscription.Unsubscribe() + return + } + time.Sleep(100 * time.Millisecond) + } +} + +func (sm *SubscriptionManager) UpdateSubscriptionMapping(userSubscriptionID string, obscuroNodeSubscriptionID string) { + existingUserIDs, exists := sm.subscriptionMappings[userSubscriptionID] + + if !exists { + sm.subscriptionMappings[userSubscriptionID] = []string{obscuroNodeSubscriptionID} + return + } + + // Check if obscuroNodeSubscriptionID already exists to avoid duplication + alreadyExists := false + for _, existingID := range existingUserIDs { + if obscuroNodeSubscriptionID == existingID { + alreadyExists = true + break + } + } + + if !alreadyExists { + sm.subscriptionMappings[userSubscriptionID] = append(existingUserIDs, obscuroNodeSubscriptionID) + } +} diff --git a/tools/walletextension/wallet_extension.go b/tools/walletextension/wallet_extension.go index 4b219207ef..b78ba62e8f 100644 --- a/tools/walletextension/wallet_extension.go +++ b/tools/walletextension/wallet_extension.go @@ -18,7 +18,6 @@ import ( "github.com/obscuronet/go-obscuro/go/common/stopcontrol" "github.com/obscuronet/go-obscuro/go/common/viewingkey" "github.com/obscuronet/go-obscuro/go/rpc" - "github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager" "github.com/obscuronet/go-obscuro/tools/walletextension/common" "github.com/obscuronet/go-obscuro/tools/walletextension/storage" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" @@ -70,7 +69,7 @@ func (w *WalletExtension) Logger() gethlog.Logger { } // ProxyEthRequest proxys an incoming user request to the enclave -func (w *WalletExtension) ProxyEthRequest(request *accountmanager.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) { +func (w *WalletExtension) ProxyEthRequest(request *common.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) { response := map[string]interface{}{} // all responses must contain the request id. Both successful and unsuccessful. response[common.JSONKeyRPCVersion] = jsonrpc.Version @@ -378,7 +377,7 @@ func adjustStateRoot(rpcResp interface{}, respMap map[string]interface{}) { // getStorageAtInterceptor checks if the parameters for getStorageAt are set to values that require interception // and return response or nil if the gateway should forward the request to the node. -func (w *WalletExtension) getStorageAtInterceptor(request *accountmanager.RPCRequest, hexUserID string) map[string]interface{} { +func (w *WalletExtension) getStorageAtInterceptor(request *common.RPCRequest, hexUserID string) map[string]interface{} { // check if parameters are correct, and we can intercept a request, otherwise return nil if w.checkParametersForInterceptedGetStorageAt(request.Params) { // check if userID in the parameters is also in our database