From 31040009231e24eaef9577332690f77233193d27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Kokelj?= Date: Thu, 19 Oct 2023 17:56:27 +0200 Subject: [PATCH] prepare for multiple subscriptions and add test --- .../obscurogateway/events_contract.sol | 31 ++ integration/obscurogateway/gateway_user.go | 121 +++++ .../obscurogateway/obscurogateway_test.go | 423 +++++++++++++++++- .../accountmanager/account_manager.go | 69 +-- .../subscriptions/subscriptions.go | 147 +++--- 5 files changed, 644 insertions(+), 147 deletions(-) create mode 100644 integration/obscurogateway/events_contract.sol create mode 100644 integration/obscurogateway/gateway_user.go diff --git a/integration/obscurogateway/events_contract.sol b/integration/obscurogateway/events_contract.sol new file mode 100644 index 0000000000..5ee90be5c2 --- /dev/null +++ b/integration/obscurogateway/events_contract.sol @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: MIT +// Specify the Solidity version +pragma solidity ^0.8.0; + +contract SimpleMessageContract { + + // State variable to store the message + string public message; + string public message2; + + // Event declaration + event MessageUpdatedWithAddress(string newMessage, address indexed sender); + event Message2Updated(string newMessage); + + // Constructor to initialize the message + constructor() { + message = "foo"; + message2 = "foo"; + } + + // Function to set a new message + function setMessage(string memory newMessage) public { + message = newMessage; + emit MessageUpdatedWithAddress(newMessage, msg.sender); // Emit the event (only sender can see it) + } + + function setMessage2(string memory newMessage) public { + message2 = newMessage; + emit Message2Updated(newMessage); // Emit the event (everyone can see it) + } +} \ No newline at end of file diff --git a/integration/obscurogateway/gateway_user.go b/integration/obscurogateway/gateway_user.go new file mode 100644 index 0000000000..660b497e18 --- /dev/null +++ b/integration/obscurogateway/gateway_user.go @@ -0,0 +1,121 @@ +package faucet + +import ( + "context" + "crypto/ecdsa" + "encoding/hex" + "fmt" + "io" + "net/http" + "strings" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/obscuronet/go-obscuro/go/wallet" + "github.com/valyala/fasthttp" +) + +type GatewayUser struct { + UserID string + Wallets []wallet.Wallet + HTTPClient *ethclient.Client + WSClient *ethclient.Client + ServerAddressHTTP string + ServerAddressWS string +} + +func NewUser(wallets []wallet.Wallet, serverAddressHTTP string, serverAddressWS string) (*GatewayUser, error) { + // automatically join OG + userID, err := joinObscuroGateway(serverAddressHTTP) + if err != nil { + return nil, err + } + + // create clients + httpClient, err := ethclient.Dial(serverAddressHTTP + "/v1/" + "?u=" + userID) + if err != nil { + return nil, err + } + wsClient, err := ethclient.Dial(serverAddressWS + "/v1/" + "?u=" + userID) + if err != nil { + return nil, err + } + + return &GatewayUser{ + UserID: userID, + Wallets: wallets, + HTTPClient: httpClient, + WSClient: wsClient, + ServerAddressHTTP: serverAddressHTTP, + ServerAddressWS: serverAddressWS, + }, nil +} + +func (u GatewayUser) RegisterAccounts() error { + for _, w := range u.Wallets { + response, err := registerAccount(u.ServerAddressHTTP, u.UserID, w.PrivateKey(), w.Address().Hex()) + if err != nil { + return err + } + fmt.Printf("Successfully registered address %s for user: %s with response: %s \n", w.Address().Hex(), u.UserID, response) + } + + return nil +} + +func (u GatewayUser) PrintUserAccountsBalances() error { + for _, w := range u.Wallets { + balance, err := u.HTTPClient.BalanceAt(context.Background(), w.Address(), nil) + if err != nil { + return err + } + fmt.Println("Balance for account ", w.Address().Hex(), " - ", balance.String()) + } + return nil +} + +func registerAccount(url string, userID string, pk *ecdsa.PrivateKey, hexAddress string) ([]byte, error) { + payload := prepareRegisterPayload(userID, pk, hexAddress) + + req, err := http.NewRequestWithContext( + context.Background(), + http.MethodPost, + url+"/v1/authenticate/?u="+userID, + strings.NewReader(payload), + ) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + + client := &http.Client{} + response, err := client.Do(req) + if err != nil { + return nil, err + } + + defer response.Body.Close() + return io.ReadAll(response.Body) +} + +func prepareRegisterPayload(userID string, pk *ecdsa.PrivateKey, hexAddress string) string { + message := fmt.Sprintf("Register %s for %s", userID, strings.ToLower(hexAddress)) + prefixedMessage := fmt.Sprintf("\u0019Ethereum Signed Message:\n%d%s", len(message), message) + messageHash := crypto.Keccak256([]byte(prefixedMessage)) + sig, err := crypto.Sign(messageHash, pk) + if err != nil { + fmt.Printf("Failed to sign message: %v\n", err) + } + sig[64] += 27 + signature := "0x" + hex.EncodeToString(sig) + payload := fmt.Sprintf("{\"signature\": \"%s\", \"message\": \"%s\"}", signature, message) + return payload +} + +func joinObscuroGateway(url string) (string, error) { + statusCode, userID, err := fasthttp.Get(nil, fmt.Sprintf("%s/v1/join/", url)) + if err != nil || statusCode != 200 { + return "", fmt.Errorf(fmt.Sprintf("Failed to get userID. Status code: %d, err: %s", statusCode, err)) + } + return string(userID), nil +} diff --git a/integration/obscurogateway/obscurogateway_test.go b/integration/obscurogateway/obscurogateway_test.go index f6cd9e3862..922d587622 100644 --- a/integration/obscurogateway/obscurogateway_test.go +++ b/integration/obscurogateway/obscurogateway_test.go @@ -3,6 +3,12 @@ package faucet import ( "context" "fmt" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + "github.com/obscuronet/go-obscuro/go/common/retry" "math/big" "net/http" "strings" @@ -105,6 +111,254 @@ func TestObscuroGateway(t *testing.T) { assert.NoError(t, err) } +func TestObscuroGatewaySubscriptionsWithMultipleAccounts(t *testing.T) { + // t.Skip("Commented it out until more testing is driven from this test") + startPort := integration.StartPortObscuroGatewayUnitTest + wallets := createObscuroNetwork(t, startPort) + + obscuroGatewayConf := config.Config{ + WalletExtensionHost: "127.0.0.1", + WalletExtensionPortHTTP: startPort + integration.DefaultObscuroGatewayHTTPPortOffset, + WalletExtensionPortWS: startPort + integration.DefaultObscuroGatewayWSPortOffset, + NodeRPCHTTPAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCHTTPOffset), + NodeRPCWebsocketAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCWSOffset), + LogPath: "sys_out", + VerboseFlag: false, + DBType: "sqlite", + } + + obscuroGwContainer := container.NewWalletExtensionContainerFromConfig(obscuroGatewayConf, testlog.Logger()) + go func() { + err := obscuroGwContainer.Start() + if err != nil { + fmt.Printf("error stopping WE - %s", err) + } + }() + + // wait for the msg bus contract to be deployed + time.Sleep(5 * time.Second) + + // make sure the server is ready to receive requests + gatewayAddressHTTP := fmt.Sprintf("http://%s:%d", obscuroGatewayConf.WalletExtensionHost, obscuroGatewayConf.WalletExtensionPortHTTP) + gatewayAddressWS := fmt.Sprintf("ws://%s:%d", obscuroGatewayConf.WalletExtensionHost, obscuroGatewayConf.WalletExtensionPortWS) + fmt.Println("gatewayAddressHTTP: ", gatewayAddressHTTP) + fmt.Println("gatewayAddressWS: ", gatewayAddressWS) + + // make sure the server is ready to receive requests + err := waitServerIsReady(gatewayAddressHTTP) + require.NoError(t, err) + + // Server is now ready and we can create requests + + // Create users + user0, err := NewUser([]wallet.Wallet{wallets.L2FaucetWallet}, gatewayAddressHTTP, gatewayAddressWS) + require.NoError(t, err) + fmt.Printf("Created user with UserID: %s\n", user0.UserID) + + user1, err := NewUser(wallets.SimObsWallets[0:2], gatewayAddressHTTP, gatewayAddressWS) + require.NoError(t, err) + fmt.Printf("Created user with UserID: %s\n", user1.UserID) + + user2, err := NewUser(wallets.SimObsWallets[2:4], gatewayAddressHTTP, gatewayAddressWS) + require.NoError(t, err) + fmt.Printf("Created user with UserID: %s\n", user2.UserID) + + // register all the accounts for that user + err = user0.RegisterAccounts() + require.NoError(t, err) + err = user1.RegisterAccounts() + require.NoError(t, err) + err = user2.RegisterAccounts() + require.NoError(t, err) + + // Transfer some funds to user1 and user2 wallets, because they need it to make transactions + var amountToTransfer int64 = 1_000_000_000_000_000_000 + // Transfer some funds to user1 and user2 wallets, because they need it to make transactions + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user1.Wallets[0].Address(), amountToTransfer) + require.NoError(t, err) + time.Sleep(5 * time.Second) + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user1.Wallets[1].Address(), amountToTransfer) + require.NoError(t, err) + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user2.Wallets[0].Address(), amountToTransfer) + require.NoError(t, err) + _, err = TransferETHToAddress(user0.HTTPClient, user0.Wallets[0], user2.Wallets[1].Address(), amountToTransfer) + require.NoError(t, err) + + // Print balances of all registered accounts to check if all accounts have funds + err = user0.PrintUserAccountsBalances() + require.NoError(t, err) + err = user1.PrintUserAccountsBalances() + require.NoError(t, err) + err = user2.PrintUserAccountsBalances() + require.NoError(t, err) + + // User0 deploys a contract that will later emit events + bytecode := `60806040523480156200001157600080fd5b506040518060400160405280600381526020017f666f6f00000000000000000000000000000000000000000000000000000000008152506000908162000058919062000320565b506040518060400160405280600381526020017f666f6f0000000000000000000000000000000000000000000000000000000000815250600190816200009f919062000320565b5062000407565b600081519050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052604160045260246000fd5b7f4e487b7100000000000000000000000000000000000000000000000000000000600052602260045260246000fd5b600060028204905060018216806200012857607f821691505b6020821081036200013e576200013d620000e0565b5b50919050565b60008190508160005260206000209050919050565b60006020601f8301049050919050565b600082821b905092915050565b600060088302620001a87fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8262000169565b620001b4868362000169565b95508019841693508086168417925050509392505050565b6000819050919050565b6000819050919050565b600062000201620001fb620001f584620001cc565b620001d6565b620001cc565b9050919050565b6000819050919050565b6200021d83620001e0565b620002356200022c8262000208565b84845462000176565b825550505050565b600090565b6200024c6200023d565b6200025981848462000212565b505050565b5b8181101562000281576200027560008262000242565b6001810190506200025f565b5050565b601f821115620002d0576200029a8162000144565b620002a58462000159565b81016020851015620002b5578190505b620002cd620002c48562000159565b8301826200025e565b50505b505050565b600082821c905092915050565b6000620002f560001984600802620002d5565b1980831691505092915050565b6000620003108383620002e2565b9150826002028217905092915050565b6200032b82620000a6565b67ffffffffffffffff811115620003475762000346620000b1565b5b6200035382546200010f565b6200036082828562000285565b600060209050601f83116001811462000398576000841562000383578287015190505b6200038f858262000302565b865550620003ff565b601f198416620003a88662000144565b60005b82811015620003d257848901518255600182019150602085019450602081019050620003ab565b86831015620003f25784890151620003ee601f891682620002e2565b8355505b6001600288020188555050505b505050505050565b6107ee80620004176000396000f3fe608060405234801561001057600080fd5b506004361061004c5760003560e01c8063368b877214610051578063c2d366581461006d578063c5ced0361461008b578063e21f37ce146100a7575b600080fd5b61006b600480360381019061006691906103e6565b6100c5565b005b610075610126565b60405161008291906104ae565b60405180910390f35b6100a560048036038101906100a091906103e6565b6101b4565b005b6100af6101fe565b6040516100bc91906104ae565b60405180910390f35b80600090816100d491906106e6565b503373ffffffffffffffffffffffffffffffffffffffff167fe31c2ad953ded70272b94617f9181f8cc33755f1b40f4c706660f6ee0dfb634a8260405161011b91906104ae565b60405180910390a250565b60018054610133906104ff565b80601f016020809104026020016040519081016040528092919081815260200182805461015f906104ff565b80156101ac5780601f10610181576101008083540402835291602001916101ac565b820191906000526020600020905b81548152906001019060200180831161018f57829003601f168201915b505050505081565b80600190816101c391906106e6565b507f4fcdf2659dcf2254d2bce07af2baaf0c6ddf6da052dd241b2445a2f6398ae575816040516101f391906104ae565b60405180910390a150565b6000805461020b906104ff565b80601f0160208091040260200160405190810160405280929190818152602001828054610237906104ff565b80156102845780601f1061025957610100808354040283529160200191610284565b820191906000526020600020905b81548152906001019060200180831161026757829003601f168201915b505050505081565b6000604051905090565b600080fd5b600080fd5b600080fd5b600080fd5b6000601f19601f8301169050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052604160045260246000fd5b6102f3826102aa565b810181811067ffffffffffffffff82111715610312576103116102bb565b5b80604052505050565b600061032561028c565b905061033182826102ea565b919050565b600067ffffffffffffffff821115610351576103506102bb565b5b61035a826102aa565b9050602081019050919050565b82818337600083830152505050565b600061038961038484610336565b61031b565b9050828152602081018484840111156103a5576103a46102a5565b5b6103b0848285610367565b509392505050565b600082601f8301126103cd576103cc6102a0565b5b81356103dd848260208601610376565b91505092915050565b6000602082840312156103fc576103fb610296565b5b600082013567ffffffffffffffff81111561041a5761041961029b565b5b610426848285016103b8565b91505092915050565b600081519050919050565b600082825260208201905092915050565b60005b8381101561046957808201518184015260208101905061044e565b60008484015250505050565b60006104808261042f565b61048a818561043a565b935061049a81856020860161044b565b6104a3816102aa565b840191505092915050565b600060208201905081810360008301526104c88184610475565b905092915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052602260045260246000fd5b6000600282049050600182168061051757607f821691505b60208210810361052a576105296104d0565b5b50919050565b60008190508160005260206000209050919050565b60006020601f8301049050919050565b600082821b905092915050565b6000600883026105927fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff82610555565b61059c8683610555565b95508019841693508086168417925050509392505050565b6000819050919050565b6000819050919050565b60006105e36105de6105d9846105b4565b6105be565b6105b4565b9050919050565b6000819050919050565b6105fd836105c8565b610611610609826105ea565b848454610562565b825550505050565b600090565b610626610619565b6106318184846105f4565b505050565b5b818110156106555761064a60008261061e565b600181019050610637565b5050565b601f82111561069a5761066b81610530565b61067484610545565b81016020851015610683578190505b61069761068f85610545565b830182610636565b50505b505050565b600082821c905092915050565b60006106bd6000198460080261069f565b1980831691505092915050565b60006106d683836106ac565b9150826002028217905092915050565b6106ef8261042f565b67ffffffffffffffff811115610708576107076102bb565b5b61071282546104ff565b61071d828285610659565b600060209050601f831160018114610750576000841561073e578287015190505b61074885826106ca565b8655506107b0565b601f19841661075e86610530565b60005b8281101561078657848901518255600182019150602085019450602081019050610761565b868310156107a3578489015161079f601f8916826106ac565b8355505b6001600288020188555050505b50505050505056fea264697066735822122076146d8c796917af248ecb981f38094293788d92ad21704dc623fd8412cb9dc964736f6c63430008120033` + abiString := ` + [ + { + "inputs": [], + "stateMutability": "nonpayable", + "type": "constructor" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "newMessage", + "type": "string" + } + ], + "name": "Message2Updated", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "newMessage", + "type": "string" + }, + { + "indexed": true, + "internalType": "address", + "name": "sender", + "type": "address" + } + ], + "name": "MessageUpdatedWithAddress", + "type": "event" + }, + { + "inputs": [], + "name": "message", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "message2", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [ + { + "internalType": "string", + "name": "newMessage", + "type": "string" + } + ], + "name": "setMessage", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, + { + "inputs": [ + { + "internalType": "string", + "name": "newMessage", + "type": "string" + } + ], + "name": "setMessage2", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + } +] + ` + + _, contractAddress, err := DeploySmartContract(user0.HTTPClient, user0.Wallets[0], bytecode) + require.NoError(t, err) + fmt.Println("Deployed contract address: ", contractAddress) + + // contract abi + contractAbi, err := abi.JSON(strings.NewReader(abiString)) + require.NoError(t, err) + + // check if contract was deployed and call one of the implicit getter functions + // call getter for a message + resultMessage, err := getStringValueFromSmartContractGetter(contractAddress, contractAbi, "message", user1.HTTPClient) + require.NoError(t, err) + + // check if the value is the same as hardcoded in smart contract + hardcodedMessageValue := "foo" + assert.Equal(t, hardcodedMessageValue, resultMessage) + + // subscribe with all three users for all events in deployed contract + var user0logs []types.Log + var user1logs []types.Log + var user2logs []types.Log + subscribeToEvents([]gethcommon.Address{contractAddress}, nil, user0.WSClient, &user0logs) + subscribeToEvents([]gethcommon.Address{contractAddress}, nil, user1.WSClient, &user1logs) + subscribeToEvents([]gethcommon.Address{contractAddress}, nil, user2.WSClient, &user2logs) + + time.Sleep(time.Second) + + // user1 calls setMessage and setMessage2 on deployed smart contract with the account + // that was registered as the first in OG + user1MessageValue := "user1PrivateEvent" + // interact with smart contract and cause events to be emitted + _, err = InteractWithSmartContract(user1.HTTPClient, user1.Wallets[0], contractAbi, "setMessage", "user1PrivateEvent", contractAddress) + require.NoError(t, err) + _, err = InteractWithSmartContract(user1.HTTPClient, user1.Wallets[0], contractAbi, "setMessage2", "user1PublicEvent", contractAddress) + require.NoError(t, err) + + // check if value was changed in the smart contract with the interactions above + resultMessage, err = getStringValueFromSmartContractGetter(contractAddress, contractAbi, "message", user1.HTTPClient) + require.NoError(t, err) + assert.Equal(t, user1MessageValue, resultMessage) + + // user2 calls setMessage and setMessage2 on deployed smart contract with the account + // that was registered as the second in OG + user2MessageValue := "user2PrivateEvent" + // interact with smart contract and cause events to be emitted + _, err = InteractWithSmartContract(user2.HTTPClient, user2.Wallets[1], contractAbi, "setMessage", "user2PrivateEvent", contractAddress) + require.NoError(t, err) + _, err = InteractWithSmartContract(user2.HTTPClient, user2.Wallets[1], contractAbi, "setMessage2", "user2PublicEvent", contractAddress) + require.NoError(t, err) + + // check if value was changed in the smart contract with the interactions above + resultMessage, err = getStringValueFromSmartContractGetter(contractAddress, contractAbi, "message", user1.HTTPClient) + require.NoError(t, err) + assert.Equal(t, user2MessageValue, resultMessage) + + // wait a few seconds to be completely sure all events arrived + time.Sleep(time.Second * 3) + + // Assert the number of logs received by each client + // user0 should see two lifecycle events (1 for each interaction with setMessage2) + assert.Equal(t, 2, len(user0logs)) + // user1 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage + assert.Equal(t, 2, len(user1logs)) // FIXME (should be 3), but we have a bug when listening to events + // user2 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage + assert.Equal(t, 3, len(user2logs)) + + // Gracefully shutdown + err = obscuroGwContainer.Stop() + assert.NoError(t, err) +} + func transferRandomAddr(t *testing.T, client *ethclient.Client, w wallet.Wallet) common.TxHash { ctx := context.Background() toAddr := datagenerator.RandomAddress() @@ -163,7 +417,7 @@ func transferRandomAddr(t *testing.T, client *ethclient.Client, w wallet.Wallet) func createObscuroNetwork(t *testing.T, startPort int) *params.SimWallets { // Create the Obscuro network. numberOfNodes := 1 - wallets := params.NewSimWallets(1, numberOfNodes, integration.EthereumChainID, integration.ObscuroChainID) + wallets := params.NewSimWallets(5, numberOfNodes, integration.EthereumChainID, integration.ObscuroChainID) simParams := params.SimParams{ NumberOfNodes: numberOfNodes, AvgBlockDuration: 1 * time.Second, @@ -200,3 +454,170 @@ func waitServerIsReady(serverAddr string) error { } return fmt.Errorf("timed out before server was ready") } + +func ComputeContractAddress(sender gethcommon.Address, nonce uint64) (gethcommon.Address, error) { + // RLP encode the byte array of the sender's address and nonce + encoded, err := rlp.EncodeToBytes([]interface{}{sender, nonce}) + if err != nil { + return gethcommon.Address{}, err + } + // Compute the Keccak-256 hash of the RLP encoded byte array + hash := crypto.Keccak256(encoded) + + // The contract address is the last 20 bytes of this hash + return gethcommon.BytesToAddress(hash[len(hash)-20:]), nil +} + +func TransferETHToAddress(client *ethclient.Client, wallet wallet.Wallet, toAddress gethcommon.Address, amount int64) (*types.Receipt, error) { + transferTx1 := types.LegacyTx{ + Nonce: wallet.GetNonceAndIncrement(), + To: &toAddress, + Value: big.NewInt(amount), + Gas: uint64(1_000_000), + GasPrice: gethcommon.Big1, + Data: nil, + } + signedTx, err := wallet.SignTransaction(&transferTx1) + if err != nil { + return nil, err + } + err = client.SendTransaction(context.Background(), signedTx) + if err != nil { + return nil, err + } + return AwaitTransactionReceipt(context.Background(), client, signedTx.Hash(), 2*time.Second) +} + +func DeploySmartContract(client *ethclient.Client, wallet wallet.Wallet, bytecode string) (*types.Receipt, gethcommon.Address, error) { + contractNonce := wallet.GetNonceAndIncrement() + contractTx := types.LegacyTx{ + Nonce: contractNonce, + Gas: uint64(1_000_000), + GasPrice: gethcommon.Big1, + Data: gethcommon.FromHex(bytecode), + } + + signedTx, err := wallet.SignTransaction(&contractTx) + if err != nil { + return nil, gethcommon.Address{}, err + } + + err = client.SendTransaction(context.Background(), signedTx) + if err != nil { + return nil, gethcommon.Address{}, err + } + + // await for the transaction to be included in a block + txReceiptContractCreation, err := AwaitTransactionReceipt(context.Background(), client, signedTx.Hash(), 2*time.Second) + if err != nil { + return nil, gethcommon.Address{}, err + } + + // get contract address + contractAddress, err := ComputeContractAddress(wallet.Address(), contractNonce) + if err != nil { + return nil, gethcommon.Address{}, err + } + + return txReceiptContractCreation, contractAddress, nil +} + +func InteractWithSmartContract(client *ethclient.Client, wallet wallet.Wallet, contractAbi abi.ABI, methodName string, methodParam string, contractAddress gethcommon.Address) (*types.Receipt, error) { + contractInteractionData, err := contractAbi.Pack(methodName, methodParam) + if err != nil { + return nil, err + } + + interactionTx := types.LegacyTx{ + Nonce: wallet.GetNonceAndIncrement(), + To: &contractAddress, + Gas: uint64(1_000_000), + GasPrice: gethcommon.Big1, + Data: contractInteractionData, + } + signedTx, err := wallet.SignTransaction(&interactionTx) + if err != nil { + return nil, err + } + err = client.SendTransaction(context.Background(), signedTx) + if err != nil { + return nil, err + } + + txReceipt, err := AwaitTransactionReceipt(context.Background(), client, signedTx.Hash(), 2*time.Second) + if err != nil { + return nil, err + } + + return txReceipt, nil +} + +func getStringValueFromSmartContractGetter(contractAddress gethcommon.Address, contractAbi abi.ABI, method string, client *ethclient.Client) (string, error) { + contract := bind.NewBoundContract(contractAddress, contractAbi, client, client, client) + var resultMessage string + callOpts := &bind.CallOpts{} + results := make([]interface{}, 1) + results[0] = &resultMessage + err := contract.Call(callOpts, &results, method) + if err != nil { + return "", err + } + + return resultMessage, nil +} + +func AwaitTransactionReceipt(ctx context.Context, client *ethclient.Client, txHash gethcommon.Hash, timeout time.Duration) (*types.Receipt, error) { + timeoutStrategy := retry.NewTimeoutStrategy(timeout, time.Second) + return AwaitTransactionReceiptWithRetryStrategy(ctx, client, txHash, timeoutStrategy) +} + +func AwaitTransactionReceiptWithRetryStrategy(ctx context.Context, client *ethclient.Client, txHash gethcommon.Hash, retryStrategy retry.Strategy) (*types.Receipt, error) { + retryStrategy.Reset() + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled before receipt was received") + case <-time.After(retryStrategy.NextRetryInterval()): + receipt, err := client.TransactionReceipt(ctx, txHash) + if err == nil { + return receipt, nil + } + if retryStrategy.Done() { + return nil, fmt.Errorf("receipt not found - %s - %w", retryStrategy.Summary(), err) + } + } + } +} + +func subscribeToEvents(addresses []gethcommon.Address, topics [][]gethcommon.Hash, client *ethclient.Client, logs *[]types.Log) { + // Make a subscription + filterQuery := ethereum.FilterQuery{ + Addresses: addresses, + FromBlock: big.NewInt(0), // todo (@ziga) - without those we get errors - fix that and make them configurable + ToBlock: big.NewInt(10000), + Topics: topics, + } + logsCh := make(chan types.Log) + + subscription, err := client.SubscribeFilterLogs(context.Background(), filterQuery, logsCh) + if err != nil { + fmt.Printf("Failed to subscribe to filter logs: %v\n", err) + } + // todo (@ziga) - unsubscribe when it is fixed... + // defer subscription.Unsubscribe() // cleanup + + // Listen for logs in a goroutine + go func() { + for { + select { + case err := <-subscription.Err(): + fmt.Printf("Error from logs subscription: %v\n", err) + return + case log := <-logsCh: + // append logs to be visible from the main thread + *logs = append(*logs, log) + } + } + }() + +} diff --git a/tools/walletextension/accountmanager/account_manager.go b/tools/walletextension/accountmanager/account_manager.go index 87333e9d3c..a28e35de38 100644 --- a/tools/walletextension/accountmanager/account_manager.go +++ b/tools/walletextension/accountmanager/account_manager.go @@ -6,9 +6,8 @@ import ( "fmt" "strings" - "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" - "github.com/ethereum/go-ethereum/eth/filters" + "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" "github.com/obscuronet/go-obscuro/go/common/gethencoding" @@ -63,19 +62,12 @@ func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *inte if err != nil { return err } - err = m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn) if err != nil { m.logger.Error("Error subscribing to multiple clients") } return err - - //// fetch the clients from a topic (todo: @ziga - delete it) - //for _, client := range clients { - // return m.executeSubscribe(client, rpcReq, rpcResp, userConn) - //} } - return m.executeCall(rpcReq, rpcResp) } @@ -279,65 +271,6 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map return nil, fmt.Errorf("no known account found in data bytes") } -//func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit -// if len(req.Params) == 0 { -// return fmt.Errorf("could not subscribe as no subscription namespace was provided") -// } -// m.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) -// ch := make(chan common.IDAndLog) -// subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, ch, req.Params...) -// if err != nil { -// return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) -// } -// -// // We listen for incoming messages on the subscription. -// go func() { -// for { -// select { -// case idAndLog := <-ch: -// if userConn.IsClosed() { -// m.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) -// return -// } -// -// jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) -// if err != nil { -// m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) -// continue -// } -// -// m.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, idAndLog.SubID) -// err = userConn.WriteResponse(jsonResponse) -// if err != nil { -// m.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) -// continue -// } -// -// case err = <-subscription.Err(): -// // An error on this channel means the subscription has ended, so we exit the loop. -// if userConn != nil && err != nil { -// userConn.HandleError(err.Error()) -// } -// -// return -// } -// } -// }() -// -// // We periodically check if the websocket is closed, and terminate the subscription. -// go func() { -// for { -// if userConn.IsClosed() { -// subscription.Unsubscribe() -// return -// } -// time.Sleep(100 * time.Millisecond) -// } -// }() -// -// return nil -//} - func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error { if req.Method == rpc.Call || req.Method == rpc.EstimateGas { // Never modify the original request, as it might be reused. diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go index 6a342d5552..7b5ae01b8f 100644 --- a/tools/walletextension/subscriptions/subscriptions.go +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -3,11 +3,8 @@ package subscriptions import ( "context" "fmt" - "strings" "time" - "github.com/ethereum/go-ethereum/crypto" - gethlog "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/obscuronet/go-obscuro/go/common" @@ -18,112 +15,106 @@ import ( ) type SubscriptionManager struct { - logger gethlog.Logger + subscriptionMappings map[string][]string + logger gethlog.Logger } func New(logger gethlog.Logger) *SubscriptionManager { return &SubscriptionManager{ - logger: logger, + subscriptionMappings: make(map[string][]string), + logger: logger, } } // HandleNewSubscriptions subscribes to an event with all the clients provided. -// Doing this is necessary because we have relevancy rule and we want to subscribe sometimes with all clients to get all the events -func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { +// Doing this is necessary because we have relevancy rule, and we want to subscribe sometimes with all clients to get all the events +func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { if len(req.Params) == 0 { return fmt.Errorf("could not subscribe as no subscription namespace was provided") } - // create a chanel that will collect the data from all subscriptions - commonChannel := make(chan common.IDAndLog) + sm.logger.Info(fmt.Sprintf("Subscribing to event %s with %d clients", req.Params, len(clients))) - // save subscriptions - subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) - subscriptionIDS := make([]string, 0, len(clients)) + // create a common channel for subscriptions from all clients + funnelMultipleAccountsChan := make(chan common.IDAndLog) - // TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs + // read from a multiple accounts channel and write results to userConn + go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, sm.logger) - // Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user - go func() { - for { - if userConn.IsClosed() { - for _, subscription := range subscriptions { - subscription.Unsubscribe() - } - subscriptions = []*gethrpc.ClientSubscription{} - return - } - time.Sleep(100 * time.Millisecond) + // iterate over all clients and subscribe for each of them + for _, client := range clients { + // fmt.Println("Subscribing with client: ", client) + subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...) + if err != nil { + return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) } - }() - - // Send all logs from common channel to user (via userConn) - go func() { - for idAndLog := range commonChannel { - if userConn.IsClosed() { - // Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine - s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) - return - } - jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) - fmt.Println("We have a log: ", string(jsonResponse)) + // Add map subscriptionIDs + if currentNodeSubscriptionID, ok := (*resp).(string); ok { + // TODO (@ziga): Currently we use the same value for node and user subscriptionID - this will change after + // subscribing with multiple accounts + sm.UpdateSubscriptionMapping(currentNodeSubscriptionID, currentNodeSubscriptionID) + } + + // We periodically check if the websocket is closed, and terminate the subscription. + go checkIfUserConnIsClosedAndUnsubscribe(userConn, subscription) + + return nil + // TODO (@ziga) + // At this stage we want to use only the first account - same as before + // introduce subscribing with all accounts in another PR ) + } + return nil +} + +func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, logger gethlog.Logger) { + for { + select { + case data := <-channel: + jsonResponse, err := wecommon.PrepareLogResponse(data) if err != nil { - s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err) continue } + + logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID) err = userConn.WriteResponse(jsonResponse) if err != nil { - s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, data.SubID, log.ErrKey, err) continue } } - }() + } +} - // loop over all clients and create a new subscription for each of them - s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) - for _, client := range clients { - s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) - fmt.Println("Subscribing to logs with client: ", client) - var subscriptionID interface{} - subscription, err := s.addSubscription(client, req, &subscriptionID, commonChannel) - strSubscriptionID, isOK := subscriptionID.(string) - if err != nil || !isOK { - s.logger.Info(fmt.Sprintf("Error subscribing: %v", err)) - continue +func checkIfUserConnIsClosedAndUnsubscribe(userConn userconn.UserConn, subscription *gethrpc.ClientSubscription) { + for { + if userConn.IsClosed() { + subscription.Unsubscribe() + return } - // If there was no error, the subscription was successful. Store it for unsubscribing in the future - subscriptions = append(subscriptions, subscription) - subscriptionIDS = append(subscriptionIDS, strSubscriptionID) - s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) + time.Sleep(100 * time.Millisecond) } - - // create a response with new subscriptionID by concatenating them and computing hash of the concatenated string - combinedSubscriptionIDS := strings.Join(subscriptionIDS, "") - // Compute Keccak-256 hash - subscriptionsIDHash := crypto.Keccak256([]byte(combinedSubscriptionIDS)) - // Convert hash to hex string for better readability - *resp = fmt.Sprintf("%x", subscriptionsIDHash) - - // TODO: - // We need to store subscriptionsIDHash and subscriptionIDS and have them available for unsubscribe - - // where is the best place to store them? - // - 1. option is database -> More complicated, - // can contain elements that are not relevant anymore in case of crashes, etd. - // - 2. option is in-memory storage, it si simpler, but will consume more RAM, - // easier to handle since on every crash/restart it is cleared (and also all the subscriptions are dropped by our logic) - - return nil } -func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) { - s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) +func (sm *SubscriptionManager) UpdateSubscriptionMapping(userSubscriptionID string, obscuroNodeSubscriptionID string) { + existingUserIDs, exists := sm.subscriptionMappings[userSubscriptionID] - // Subscribe using the provided rpc.Client, all subscriptions have the same channel - subscription, err := client.Subscribe(context.Background(), rpcResp, rpc.SubscribeNamespace, commonChannel, req.Params...) - if err != nil { - return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) + if !exists { + sm.subscriptionMappings[userSubscriptionID] = []string{obscuroNodeSubscriptionID} + return } - return subscription, nil + // Check if obscuroNodeSubscriptionID already exists to avoid duplication + alreadyExists := false + for _, existingID := range existingUserIDs { + if obscuroNodeSubscriptionID == existingID { + alreadyExists = true + break + } + } + + if !alreadyExists { + sm.subscriptionMappings[userSubscriptionID] = append(existingUserIDs, obscuroNodeSubscriptionID) + } }