Skip to content

Commit

Permalink
subscribe to multiple clients
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj committed Sep 26, 2023
1 parent b7b4e3e commit fb1a12b
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 56 deletions.
72 changes: 22 additions & 50 deletions tools/walletextension/accountmanager/account_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"time"

"github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions"

"github.com/ethereum/go-ethereum/eth/filters"

"github.com/obscuronet/go-obscuro/go/common/gethencoding"
Expand All @@ -18,8 +20,6 @@ import (

wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common"

"github.com/go-kit/kit/transport/http/jsonrpc"

"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/rpc"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
Expand All @@ -28,8 +28,6 @@ import (
)

const (
methodEthSubscription = "eth_subscription"

ethCallPaddedArgLen = 64
ethCallAddrPadding = "000000000000000000000000"

Expand All @@ -41,15 +39,17 @@ const (
type AccountManager struct {
unauthedClient rpc.Client
// todo (@ziga) - create two types of clients - WS clients, and HTTP clients - to not create WS clients unnecessarily.
accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account
logger gethlog.Logger
accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account
subscriptionsManager *subscriptions.SubscriptionManager
logger gethlog.Logger
}

func NewAccountManager(unauthedClient rpc.Client, logger gethlog.Logger) *AccountManager {
return &AccountManager{
unauthedClient: unauthedClient,
accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient),
logger: logger,
unauthedClient: unauthedClient,
accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient),
subscriptionsManager: subscriptions.New(logger),
logger: logger,
}
}

Expand All @@ -60,13 +60,18 @@ func (m *AccountManager) AddClient(address gethcommon.Address, client *rpc.EncRP

// ProxyRequest tries to identify the correct EncRPCClient to proxy the request to the Obscuro node, or it will attempt
// the request with all clients until it succeeds
func (m *AccountManager) ProxyRequest(rpcReq *RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error {
func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error {
if rpcReq.Method == rpc.Subscribe {
clients, err := m.suggestSubscriptionClient(rpcReq)
if err != nil {
return err
}
// fetch the clients from a topic

err = m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn)
if err != nil {
m.logger.Error("Error subscribing to multiple clients")
}
// fetch the clients from a topic (todo: remove and replace with HandleNewSubscriptions)
for _, client := range clients {
return m.executeSubscribe(client, rpcReq, rpcResp, userConn)
}
Expand All @@ -79,7 +84,7 @@ const emptyFilterCriteria = "[]" // This is the value that gets passed for an em

// determine the client based on the topics
// if none is found use all clients from current user
func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Client, error) {
func (m *AccountManager) suggestSubscriptionClient(rpcReq *wecommon.RPCRequest) ([]rpc.Client, error) {
clients := make([]rpc.Client, 0, len(m.accountClients))

// by default, if no client is identified as a candidate, then subscribe to all accounts
Expand Down Expand Up @@ -128,7 +133,7 @@ func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Cl
return clients, nil
}

func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) error {
func (m *AccountManager) executeCall(rpcReq *wecommon.RPCRequest, rpcResp *interface{}) error {
// for obscuro RPC requests it is important we know the sender account for the viewing key encryption/decryption
suggestedClient := m.suggestAccountClient(rpcReq, m.accountClients)

Expand Down Expand Up @@ -160,7 +165,7 @@ func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) e
}

// suggestAccountClient works through various methods to try and guess which available client to use for a request, returns nil if none found
func (m *AccountManager) suggestAccountClient(req *RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient {
func (m *AccountManager) suggestAccountClient(req *wecommon.RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient {
if len(accClients) == 1 {
for _, client := range accClients {
// return the first (and only) client
Expand Down Expand Up @@ -275,7 +280,7 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map
return nil, fmt.Errorf("no known account found in data bytes")
}

func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit
func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit
if len(req.Params) == 0 {
return fmt.Errorf("could not subscribe as no subscription namespace was provided")
}
Expand All @@ -296,7 +301,7 @@ func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, re
return
}

jsonResponse, err := prepareLogResponse(idAndLog)
jsonResponse, err := wecommon.PrepareLogResponse(idAndLog)
if err != nil {
m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
Expand Down Expand Up @@ -334,7 +339,7 @@ func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, re
return nil
}

func submitCall(client *rpc.EncRPCClient, req *RPCRequest, resp *interface{}) error {
func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error {
if req.Method == rpc.Call || req.Method == rpc.EstimateGas {
// Never modify the original request, as it might be reused.
req = req.Clone()
Expand Down Expand Up @@ -390,36 +395,3 @@ func setFromFieldIfMissing(args []interface{}, account gethcommon.Address) ([]in

return request, nil
}

// Formats the log to be sent as an Eth JSON-RPC response.
func prepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[wecommon.JSONKeySubscription] = idAndLog.SubID
paramsMap[wecommon.JSONKeyResult] = idAndLog.Log

respMap := make(map[string]interface{})
respMap[wecommon.JSONKeyRPCVersion] = jsonrpc.Version
respMap[wecommon.JSONKeyMethod] = methodEthSubscription
respMap[wecommon.JSONKeyParams] = paramsMap

jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}

type RPCRequest struct {
ID json.RawMessage
Method string
Params []interface{}
}

// Clone returns a new instance of the *RPCRequest
func (r *RPCRequest) Clone() *RPCRequest {
return &RPCRequest{
ID: r.ID,
Method: r.Method,
Params: r.Params,
}
}
5 changes: 2 additions & 3 deletions tools/walletextension/api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import (
"encoding/json"
"fmt"

"github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager"
"github.com/obscuronet/go-obscuro/tools/walletextension/common"
)

func parseRequest(body []byte) (*accountmanager.RPCRequest, error) {
func parseRequest(body []byte) (*common.RPCRequest, error) {
// We unmarshal the JSON request
var reqJSONMap map[string]json.RawMessage
err := json.Unmarshal(body, &reqJSONMap)
Expand All @@ -31,7 +30,7 @@ func parseRequest(body []byte) (*accountmanager.RPCRequest, error) {
return nil, fmt.Errorf("could not unmarshal params list from JSON-RPC request body: %w", err)
}

return &accountmanager.RPCRequest{
return &common.RPCRequest{
ID: reqID,
Method: method,
Params: params,
Expand Down
38 changes: 38 additions & 0 deletions tools/walletextension/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package common
import (
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"regexp"

"github.com/go-kit/kit/transport/http/jsonrpc"
"github.com/obscuronet/go-obscuro/go/common"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
Expand Down Expand Up @@ -83,3 +87,37 @@ func CreateEncClient(hostRPCBindAddr string, addressBytes []byte, privateKeyByte
}
return encClient, nil
}

type RPCRequest struct {
ID json.RawMessage
Method string
Params []interface{}
}

// Clone returns a new instance of the *RPCRequest
func (r *RPCRequest) Clone() *RPCRequest {
return &RPCRequest{
ID: r.ID,
Method: r.Method,
Params: r.Params,
}
}

// Formats the log to be sent as an Eth JSON-RPC response.
// TODO (@ziga) - Move this code to a subscriptions package once it is used only there..
func PrepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[JSONKeySubscription] = idAndLog.SubID
paramsMap[JSONKeyResult] = idAndLog.Log

respMap := make(map[string]interface{})
respMap[JSONKeyRPCVersion] = jsonrpc.Version
respMap[JSONKeyMethod] = methodEthSubscription
respMap[JSONKeyParams] = paramsMap

jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}
1 change: 1 addition & 0 deletions tools/walletextension/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
GetStorageAtUserIDRequestMethodName = "getUserID"
SuccessMsg = "success"
APIVersion1 = "/v1"
methodEthSubscription = "eth_subscription"
)

var (
Expand Down
104 changes: 104 additions & 0 deletions tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package subscriptions

import (
"context"
"fmt"
"time"

gethlog "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/obscuronet/go-obscuro/go/common"
"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/rpc"
wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
)

type SubscriptionManager struct {
logger gethlog.Logger
}

func New(logger gethlog.Logger) *SubscriptionManager {
return &SubscriptionManager{
logger: logger,
}
}

func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error {
if len(req.Params) == 0 {
return fmt.Errorf("could not subscribe as no subscription namespace was provided")
}

// create a chanel that will collect the data from all subscriptions
commonChannel := make(chan common.IDAndLog)

// save subscriptions
subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients))

// TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs

// TODO: Create response...

// Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user
go func() {
for {
if userConn.IsClosed() {
for _, subscription := range subscriptions {
subscription.Unsubscribe()
}
subscriptions = []*gethrpc.ClientSubscription{}
return
}
time.Sleep(100 * time.Millisecond)
}
}()

// Send all logs from common channel to user (via userConn)
go func() {
for idAndLog := range commonChannel {
if userConn.IsClosed() {
// Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine
s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID)
return
}

jsonResponse, err := wecommon.PrepareLogResponse(idAndLog)
if err != nil {
s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
}

err = userConn.WriteResponse(jsonResponse)
if err != nil {
s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
}
}
}()

// loop over all clients and create a new subscription for each of them
s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients)))
for _, client := range clients {
s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client))
subscription, err := s.addSubscription(client, req, resp, commonChannel)
if err != nil {
s.logger.Info(fmt.Sprintf("Error: %v", err))
}
subscriptions = append(subscriptions, subscription)
// TODO: get subscription ID
// s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID))
}
return nil
}

func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) {
s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req))

// Subscribe using the provided rpc.Client, all subscriptions have the same channel
subscription, err := client.Subscribe(context.Background(), rpcResp, rpc.SubscribeNamespace, commonChannel, req.Params...)
if err != nil {
return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err)
}

return subscription, nil
}
5 changes: 2 additions & 3 deletions tools/walletextension/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/obscuronet/go-obscuro/go/common/stopcontrol"
"github.com/obscuronet/go-obscuro/go/common/viewingkey"
"github.com/obscuronet/go-obscuro/go/rpc"
"github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager"
"github.com/obscuronet/go-obscuro/tools/walletextension/common"
"github.com/obscuronet/go-obscuro/tools/walletextension/storage"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
Expand Down Expand Up @@ -65,7 +64,7 @@ func (w *WalletExtension) Logger() gethlog.Logger {
}

// ProxyEthRequest proxys an incoming user request to the enclave
func (w *WalletExtension) ProxyEthRequest(request *accountmanager.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) {
func (w *WalletExtension) ProxyEthRequest(request *common.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) {
response := map[string]interface{}{}
// all responses must contain the request id. Both successful and unsuccessful.
response[common.JSONKeyRPCVersion] = jsonrpc.Version
Expand Down Expand Up @@ -357,7 +356,7 @@ func adjustStateRoot(rpcResp interface{}, respMap map[string]interface{}) {

// getStorageAtInterceptor checks if the parameters for getStorageAt are set to values that require interception
// and return response or nil if the gateway should forward the request to the node.
func (w *WalletExtension) getStorageAtInterceptor(request *accountmanager.RPCRequest, hexUserID string) map[string]interface{} {
func (w *WalletExtension) getStorageAtInterceptor(request *common.RPCRequest, hexUserID string) map[string]interface{} {
// check if parameters are correct, and we can intercept a request, otherwise return nil
if w.checkParametersForInterceptedGetStorageAt(request.Params) {
// check if userID in the parameters is also in our database
Expand Down

0 comments on commit fb1a12b

Please sign in to comment.