diff --git a/tools/walletextension/accountmanager/account_manager.go b/tools/walletextension/accountmanager/account_manager.go index d955db4626..cdcbae3377 100644 --- a/tools/walletextension/accountmanager/account_manager.go +++ b/tools/walletextension/accountmanager/account_manager.go @@ -5,10 +5,11 @@ import ( "encoding/json" "errors" "fmt" - "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" "strings" "time" + "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/obscuronet/go-obscuro/go/common/gethencoding" @@ -66,7 +67,10 @@ func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *inte return err } - m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn) + err = m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn) + if err != nil { + m.logger.Error("Error subscribing to multiple clients") + } // fetch the clients from a topic (todo: remove and replace with HandleNewSubscriptions) for _, client := range clients { return m.executeSubscribe(client, rpcReq, rpcResp, userConn) diff --git a/tools/walletextension/common/common.go b/tools/walletextension/common/common.go index 47a4453675..e6c94e2fef 100644 --- a/tools/walletextension/common/common.go +++ b/tools/walletextension/common/common.go @@ -6,9 +6,10 @@ import ( "encoding/json" "errors" "fmt" + "regexp" + "github.com/go-kit/kit/transport/http/jsonrpc" "github.com/obscuronet/go-obscuro/go/common" - "regexp" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go index 75d980555f..b2665acfc9 100644 --- a/tools/walletextension/subscriptions/subscriptions.go +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -3,6 +3,8 @@ package subscriptions import ( "context" "fmt" + "time" + gethlog "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/obscuronet/go-obscuro/go/common" @@ -10,7 +12,6 @@ import ( "github.com/obscuronet/go-obscuro/go/rpc" wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" - "time" ) type SubscriptionManager struct { @@ -32,11 +33,13 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * commonChannel := make(chan common.IDAndLog) // save subscriptions - var subscriptions []*gethrpc.ClientSubscription + subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) // TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs - //Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user + // TODO: Create response... + + // Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user go func() { for { if userConn.IsClosed() { @@ -80,22 +83,22 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) for _, client := range clients { s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) - subscription, err := s.addSubscription(client, req, commonChannel) + subscription, err := s.addSubscription(client, req, resp, commonChannel) if err != nil { s.logger.Info(fmt.Sprintf("Error: %v", err)) } subscriptions = append(subscriptions, subscription) // TODO: get subscription ID - //s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) + // s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) } return nil } -func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) { +func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) { s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) // Subscribe using the provided rpc.Client, all subscriptions have the same channel - subscription, err := client.Subscribe(context.Background(), nil, rpc.SubscribeNamespace, commonChannel, req.Params...) + subscription, err := client.Subscribe(context.Background(), rpcResp, rpc.SubscribeNamespace, commonChannel, req.Params...) if err != nil { return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) }