From fb1a12b3392fb3b7f7b0922c94cf2ca2ea80c13f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Kokelj?= Date: Wed, 13 Sep 2023 17:32:10 +0200 Subject: [PATCH] subscribe to multiple clients --- .../accountmanager/account_manager.go | 72 ++++-------- tools/walletextension/api/utils.go | 5 +- tools/walletextension/common/common.go | 38 +++++++ tools/walletextension/common/constants.go | 1 + .../subscriptions/subscriptions.go | 104 ++++++++++++++++++ tools/walletextension/wallet_extension.go | 5 +- 6 files changed, 169 insertions(+), 56 deletions(-) create mode 100644 tools/walletextension/subscriptions/subscriptions.go diff --git a/tools/walletextension/accountmanager/account_manager.go b/tools/walletextension/accountmanager/account_manager.go index 235acdd1f7..cdcbae3377 100644 --- a/tools/walletextension/accountmanager/account_manager.go +++ b/tools/walletextension/accountmanager/account_manager.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/obscuronet/go-obscuro/go/common/gethencoding" @@ -18,8 +20,6 @@ import ( wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" - "github.com/go-kit/kit/transport/http/jsonrpc" - "github.com/obscuronet/go-obscuro/go/common/log" "github.com/obscuronet/go-obscuro/go/rpc" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" @@ -28,8 +28,6 @@ import ( ) const ( - methodEthSubscription = "eth_subscription" - ethCallPaddedArgLen = 64 ethCallAddrPadding = "000000000000000000000000" @@ -41,15 +39,17 @@ const ( type AccountManager struct { unauthedClient rpc.Client // todo (@ziga) - create two types of clients - WS clients, and HTTP clients - to not create WS clients unnecessarily. - accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account - logger gethlog.Logger + accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account + subscriptionsManager *subscriptions.SubscriptionManager + logger gethlog.Logger } func NewAccountManager(unauthedClient rpc.Client, logger gethlog.Logger) *AccountManager { return &AccountManager{ - unauthedClient: unauthedClient, - accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient), - logger: logger, + unauthedClient: unauthedClient, + accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient), + subscriptionsManager: subscriptions.New(logger), + logger: logger, } } @@ -60,13 +60,18 @@ func (m *AccountManager) AddClient(address gethcommon.Address, client *rpc.EncRP // ProxyRequest tries to identify the correct EncRPCClient to proxy the request to the Obscuro node, or it will attempt // the request with all clients until it succeeds -func (m *AccountManager) ProxyRequest(rpcReq *RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error { +func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error { if rpcReq.Method == rpc.Subscribe { clients, err := m.suggestSubscriptionClient(rpcReq) if err != nil { return err } - // fetch the clients from a topic + + err = m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn) + if err != nil { + m.logger.Error("Error subscribing to multiple clients") + } + // fetch the clients from a topic (todo: remove and replace with HandleNewSubscriptions) for _, client := range clients { return m.executeSubscribe(client, rpcReq, rpcResp, userConn) } @@ -79,7 +84,7 @@ const emptyFilterCriteria = "[]" // This is the value that gets passed for an em // determine the client based on the topics // if none is found use all clients from current user -func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Client, error) { +func (m *AccountManager) suggestSubscriptionClient(rpcReq *wecommon.RPCRequest) ([]rpc.Client, error) { clients := make([]rpc.Client, 0, len(m.accountClients)) // by default, if no client is identified as a candidate, then subscribe to all accounts @@ -128,7 +133,7 @@ func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Cl return clients, nil } -func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) error { +func (m *AccountManager) executeCall(rpcReq *wecommon.RPCRequest, rpcResp *interface{}) error { // for obscuro RPC requests it is important we know the sender account for the viewing key encryption/decryption suggestedClient := m.suggestAccountClient(rpcReq, m.accountClients) @@ -160,7 +165,7 @@ func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) e } // suggestAccountClient works through various methods to try and guess which available client to use for a request, returns nil if none found -func (m *AccountManager) suggestAccountClient(req *RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient { +func (m *AccountManager) suggestAccountClient(req *wecommon.RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient { if len(accClients) == 1 { for _, client := range accClients { // return the first (and only) client @@ -275,7 +280,7 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map return nil, fmt.Errorf("no known account found in data bytes") } -func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit +func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit if len(req.Params) == 0 { return fmt.Errorf("could not subscribe as no subscription namespace was provided") } @@ -296,7 +301,7 @@ func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, re return } - jsonResponse, err := prepareLogResponse(idAndLog) + jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) if err != nil { m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) continue @@ -334,7 +339,7 @@ func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, re return nil } -func submitCall(client *rpc.EncRPCClient, req *RPCRequest, resp *interface{}) error { +func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error { if req.Method == rpc.Call || req.Method == rpc.EstimateGas { // Never modify the original request, as it might be reused. req = req.Clone() @@ -390,36 +395,3 @@ func setFromFieldIfMissing(args []interface{}, account gethcommon.Address) ([]in return request, nil } - -// Formats the log to be sent as an Eth JSON-RPC response. -func prepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) { - paramsMap := make(map[string]interface{}) - paramsMap[wecommon.JSONKeySubscription] = idAndLog.SubID - paramsMap[wecommon.JSONKeyResult] = idAndLog.Log - - respMap := make(map[string]interface{}) - respMap[wecommon.JSONKeyRPCVersion] = jsonrpc.Version - respMap[wecommon.JSONKeyMethod] = methodEthSubscription - respMap[wecommon.JSONKeyParams] = paramsMap - - jsonResponse, err := json.Marshal(respMap) - if err != nil { - return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err) - } - return jsonResponse, nil -} - -type RPCRequest struct { - ID json.RawMessage - Method string - Params []interface{} -} - -// Clone returns a new instance of the *RPCRequest -func (r *RPCRequest) Clone() *RPCRequest { - return &RPCRequest{ - ID: r.ID, - Method: r.Method, - Params: r.Params, - } -} diff --git a/tools/walletextension/api/utils.go b/tools/walletextension/api/utils.go index 8a8ef189bb..29766e4b77 100644 --- a/tools/walletextension/api/utils.go +++ b/tools/walletextension/api/utils.go @@ -4,11 +4,10 @@ import ( "encoding/json" "fmt" - "github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager" "github.com/obscuronet/go-obscuro/tools/walletextension/common" ) -func parseRequest(body []byte) (*accountmanager.RPCRequest, error) { +func parseRequest(body []byte) (*common.RPCRequest, error) { // We unmarshal the JSON request var reqJSONMap map[string]json.RawMessage err := json.Unmarshal(body, &reqJSONMap) @@ -31,7 +30,7 @@ func parseRequest(body []byte) (*accountmanager.RPCRequest, error) { return nil, fmt.Errorf("could not unmarshal params list from JSON-RPC request body: %w", err) } - return &accountmanager.RPCRequest{ + return &common.RPCRequest{ ID: reqID, Method: method, Params: params, diff --git a/tools/walletextension/common/common.go b/tools/walletextension/common/common.go index 8b84a2c4c4..e6c94e2fef 100644 --- a/tools/walletextension/common/common.go +++ b/tools/walletextension/common/common.go @@ -3,10 +3,14 @@ package common import ( "crypto/ecdsa" "encoding/hex" + "encoding/json" "errors" "fmt" "regexp" + "github.com/go-kit/kit/transport/http/jsonrpc" + "github.com/obscuronet/go-obscuro/go/common" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" @@ -83,3 +87,37 @@ func CreateEncClient(hostRPCBindAddr string, addressBytes []byte, privateKeyByte } return encClient, nil } + +type RPCRequest struct { + ID json.RawMessage + Method string + Params []interface{} +} + +// Clone returns a new instance of the *RPCRequest +func (r *RPCRequest) Clone() *RPCRequest { + return &RPCRequest{ + ID: r.ID, + Method: r.Method, + Params: r.Params, + } +} + +// Formats the log to be sent as an Eth JSON-RPC response. +// TODO (@ziga) - Move this code to a subscriptions package once it is used only there.. +func PrepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) { + paramsMap := make(map[string]interface{}) + paramsMap[JSONKeySubscription] = idAndLog.SubID + paramsMap[JSONKeyResult] = idAndLog.Log + + respMap := make(map[string]interface{}) + respMap[JSONKeyRPCVersion] = jsonrpc.Version + respMap[JSONKeyMethod] = methodEthSubscription + respMap[JSONKeyParams] = paramsMap + + jsonResponse, err := json.Marshal(respMap) + if err != nil { + return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err) + } + return jsonResponse, nil +} diff --git a/tools/walletextension/common/constants.go b/tools/walletextension/common/constants.go index 0d57916901..f57a1578c5 100644 --- a/tools/walletextension/common/constants.go +++ b/tools/walletextension/common/constants.go @@ -49,6 +49,7 @@ const ( GetStorageAtUserIDRequestMethodName = "getUserID" SuccessMsg = "success" APIVersion1 = "/v1" + methodEthSubscription = "eth_subscription" ) var ( diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go new file mode 100644 index 0000000000..150055a606 --- /dev/null +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -0,0 +1,104 @@ +package subscriptions + +import ( + "context" + "fmt" + "time" + + gethlog "github.com/ethereum/go-ethereum/log" + gethrpc "github.com/ethereum/go-ethereum/rpc" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/common/log" + "github.com/obscuronet/go-obscuro/go/rpc" + wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" + "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" +) + +type SubscriptionManager struct { + logger gethlog.Logger +} + +func New(logger gethlog.Logger) *SubscriptionManager { + return &SubscriptionManager{ + logger: logger, + } +} + +func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { + if len(req.Params) == 0 { + return fmt.Errorf("could not subscribe as no subscription namespace was provided") + } + + // create a chanel that will collect the data from all subscriptions + commonChannel := make(chan common.IDAndLog) + + // save subscriptions + subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) + + // TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs + + // TODO: Create response... + + // Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user + go func() { + for { + if userConn.IsClosed() { + for _, subscription := range subscriptions { + subscription.Unsubscribe() + } + subscriptions = []*gethrpc.ClientSubscription{} + return + } + time.Sleep(100 * time.Millisecond) + } + }() + + // Send all logs from common channel to user (via userConn) + go func() { + for idAndLog := range commonChannel { + if userConn.IsClosed() { + // Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine + s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) + return + } + + jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) + if err != nil { + s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + continue + } + + err = userConn.WriteResponse(jsonResponse) + if err != nil { + s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + continue + } + } + }() + + // loop over all clients and create a new subscription for each of them + s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) + for _, client := range clients { + s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) + subscription, err := s.addSubscription(client, req, resp, commonChannel) + if err != nil { + s.logger.Info(fmt.Sprintf("Error: %v", err)) + } + subscriptions = append(subscriptions, subscription) + // TODO: get subscription ID + // s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) + } + return nil +} + +func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) { + s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) + + // Subscribe using the provided rpc.Client, all subscriptions have the same channel + subscription, err := client.Subscribe(context.Background(), rpcResp, rpc.SubscribeNamespace, commonChannel, req.Params...) + if err != nil { + return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) + } + + return subscription, nil +} diff --git a/tools/walletextension/wallet_extension.go b/tools/walletextension/wallet_extension.go index 77fd39456e..fd000993e6 100644 --- a/tools/walletextension/wallet_extension.go +++ b/tools/walletextension/wallet_extension.go @@ -16,7 +16,6 @@ import ( "github.com/obscuronet/go-obscuro/go/common/stopcontrol" "github.com/obscuronet/go-obscuro/go/common/viewingkey" "github.com/obscuronet/go-obscuro/go/rpc" - "github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager" "github.com/obscuronet/go-obscuro/tools/walletextension/common" "github.com/obscuronet/go-obscuro/tools/walletextension/storage" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" @@ -65,7 +64,7 @@ func (w *WalletExtension) Logger() gethlog.Logger { } // ProxyEthRequest proxys an incoming user request to the enclave -func (w *WalletExtension) ProxyEthRequest(request *accountmanager.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) { +func (w *WalletExtension) ProxyEthRequest(request *common.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) { response := map[string]interface{}{} // all responses must contain the request id. Both successful and unsuccessful. response[common.JSONKeyRPCVersion] = jsonrpc.Version @@ -357,7 +356,7 @@ func adjustStateRoot(rpcResp interface{}, respMap map[string]interface{}) { // getStorageAtInterceptor checks if the parameters for getStorageAt are set to values that require interception // and return response or nil if the gateway should forward the request to the node. -func (w *WalletExtension) getStorageAtInterceptor(request *accountmanager.RPCRequest, hexUserID string) map[string]interface{} { +func (w *WalletExtension) getStorageAtInterceptor(request *common.RPCRequest, hexUserID string) map[string]interface{} { // check if parameters are correct, and we can intercept a request, otherwise return nil if w.checkParametersForInterceptedGetStorageAt(request.Params) { // check if userID in the parameters is also in our database