Skip to content

Commit

Permalink
call subscriptions for all clients
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj committed Sep 19, 2023
1 parent 650e499 commit 66e8005
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package subscriptions

import (
"context"
"fmt"
gethlog "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/obscuronet/go-obscuro/go/common"
"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/rpc"
wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
"time"
)

type SubscriptionManager struct {
Expand All @@ -28,17 +31,32 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *
// create a chanel that will collect the data from all subscriptions
commonChannel := make(chan common.IDAndLog)

// save subscriptions
var subscriptions []*gethrpc.ClientSubscription

// TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs

// TODO: Do periodic checks if userConn is closed and unsubscribe from all subscriptions
//Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user
go func() {
for {
if userConn.IsClosed() {
for _, subscription := range subscriptions {
subscription.Unsubscribe()
}
subscriptions = []*gethrpc.ClientSubscription{}
return
}
time.Sleep(100 * time.Millisecond)
}
}()

// Send all logs from common channel to user (via userConn)
go func() {
for {
select {
case idAndLog := <-commonChannel:
if userConn.IsClosed() {
// TODO (@ziga) - drop all the ws connections OG -> Obscuro Node, since we don't have user -> OG connection anymore
// Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine
s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID)
return
}
Expand All @@ -62,23 +80,25 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *
s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients)))
for _, client := range clients {
s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client))
subscriptionID, err := s.addSubscription(client, req, commonChannel)
subscription, err := s.addSubscription(client, req, commonChannel)
if err != nil {
s.logger.Info(fmt.Sprintf("Error: %v", err))
}
s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID))
subscriptions = append(subscriptions, subscription)
// TODO: get subscription ID
//s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID))
}
return nil
}

func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (string, error) {
func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) {
s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req))

// TODO: Create a subscription with Obscuro Node (with new web socket connection each time!)

// TODO:
// Do something similar as in HandleNewSubscriptions
//and check for events comming to this channel and forward them to a common channel
// Subscribe using the provided rpc.Client, all subscriptions have the same channel
subscription, err := client.Subscribe(context.Background(), nil, rpc.SubscribeNamespace, commonChannel, req.Params...)
if err != nil {
return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err)
}

return "", nil
return subscription, nil
}

0 comments on commit 66e8005

Please sign in to comment.