Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj committed Sep 19, 2023
1 parent 66e8005 commit e87dc84
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
8 changes: 6 additions & 2 deletions tools/walletextension/accountmanager/account_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions"
"strings"
"time"

"github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions"

"github.com/ethereum/go-ethereum/eth/filters"

"github.com/obscuronet/go-obscuro/go/common/gethencoding"
Expand Down Expand Up @@ -66,7 +67,10 @@ func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *inte
return err
}

m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn)
err = m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn)
if err != nil {
m.logger.Error("Error subscribing to multiple clients")
}
// fetch the clients from a topic (todo: remove and replace with HandleNewSubscriptions)
for _, client := range clients {
return m.executeSubscribe(client, rpcReq, rpcResp, userConn)
Expand Down
3 changes: 2 additions & 1 deletion tools/walletextension/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"

"github.com/go-kit/kit/transport/http/jsonrpc"
"github.com/obscuronet/go-obscuro/go/common"
"regexp"

gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
Expand Down
17 changes: 10 additions & 7 deletions tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package subscriptions
import (
"context"
"fmt"
"time"

gethlog "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/obscuronet/go-obscuro/go/common"
"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/rpc"
wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
"time"
)

type SubscriptionManager struct {
Expand All @@ -32,11 +33,13 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *
commonChannel := make(chan common.IDAndLog)

// save subscriptions
var subscriptions []*gethrpc.ClientSubscription
subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients))

// TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs

//Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user
// TODO: Create response...

// Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user
go func() {
for {
if userConn.IsClosed() {
Expand Down Expand Up @@ -80,22 +83,22 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *
s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients)))
for _, client := range clients {
s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client))
subscription, err := s.addSubscription(client, req, commonChannel)
subscription, err := s.addSubscription(client, req, resp, commonChannel)
if err != nil {
s.logger.Info(fmt.Sprintf("Error: %v", err))
}
subscriptions = append(subscriptions, subscription)
// TODO: get subscription ID
//s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID))
// s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID))
}
return nil
}

func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) {
func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) {
s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req))

// Subscribe using the provided rpc.Client, all subscriptions have the same channel
subscription, err := client.Subscribe(context.Background(), nil, rpc.SubscribeNamespace, commonChannel, req.Params...)
subscription, err := client.Subscribe(context.Background(), rpcResp, rpc.SubscribeNamespace, commonChannel, req.Params...)
if err != nil {
return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err)
}
Expand Down

0 comments on commit e87dc84

Please sign in to comment.