diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go index 111c7a0032..75d980555f 100644 --- a/tools/walletextension/subscriptions/subscriptions.go +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -1,13 +1,16 @@ package subscriptions import ( + "context" "fmt" gethlog "github.com/ethereum/go-ethereum/log" + gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/obscuronet/go-obscuro/go/common" "github.com/obscuronet/go-obscuro/go/common/log" "github.com/obscuronet/go-obscuro/go/rpc" wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" "github.com/obscuronet/go-obscuro/tools/walletextension/userconn" + "time" ) type SubscriptionManager struct { @@ -28,9 +31,24 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * // create a chanel that will collect the data from all subscriptions commonChannel := make(chan common.IDAndLog) + // save subscriptions + var subscriptions []*gethrpc.ClientSubscription + // TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs - // TODO: Do periodic checks if userConn is closed and unsubscribe from all subscriptions + //Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user + go func() { + for { + if userConn.IsClosed() { + for _, subscription := range subscriptions { + subscription.Unsubscribe() + } + subscriptions = []*gethrpc.ClientSubscription{} + return + } + time.Sleep(100 * time.Millisecond) + } + }() // Send all logs from common channel to user (via userConn) go func() { @@ -38,7 +56,7 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * select { case idAndLog := <-commonChannel: if userConn.IsClosed() { - // TODO (@ziga) - drop all the ws connections OG -> Obscuro Node, since we don't have user -> OG connection anymore + // Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) return } @@ -62,23 +80,25 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) for _, client := range clients { s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) - subscriptionID, err := s.addSubscription(client, req, commonChannel) + subscription, err := s.addSubscription(client, req, commonChannel) if err != nil { s.logger.Info(fmt.Sprintf("Error: %v", err)) } - s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) + subscriptions = append(subscriptions, subscription) + // TODO: get subscription ID + //s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) } return nil } -func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (string, error) { +func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) { s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) - // TODO: Create a subscription with Obscuro Node (with new web socket connection each time!) - - // TODO: - // Do something similar as in HandleNewSubscriptions - //and check for events comming to this channel and forward them to a common channel + // Subscribe using the provided rpc.Client, all subscriptions have the same channel + subscription, err := client.Subscribe(context.Background(), nil, rpc.SubscribeNamespace, commonChannel, req.Params...) + if err != nil { + return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) + } - return "", nil + return subscription, nil }