-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
169 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package subscriptions | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
gethlog "github.com/ethereum/go-ethereum/log" | ||
gethrpc "github.com/ethereum/go-ethereum/rpc" | ||
"github.com/obscuronet/go-obscuro/go/common" | ||
"github.com/obscuronet/go-obscuro/go/common/log" | ||
"github.com/obscuronet/go-obscuro/go/rpc" | ||
wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common" | ||
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn" | ||
) | ||
|
||
type SubscriptionManager struct { | ||
logger gethlog.Logger | ||
} | ||
|
||
func New(logger gethlog.Logger) *SubscriptionManager { | ||
return &SubscriptionManager{ | ||
logger: logger, | ||
} | ||
} | ||
|
||
func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { | ||
if len(req.Params) == 0 { | ||
return fmt.Errorf("could not subscribe as no subscription namespace was provided") | ||
} | ||
|
||
// create a chanel that will collect the data from all subscriptions | ||
commonChannel := make(chan common.IDAndLog) | ||
|
||
// save subscriptions | ||
subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) | ||
|
||
// TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs | ||
|
||
// TODO: Create response... | ||
|
||
// Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user | ||
go func() { | ||
for { | ||
if userConn.IsClosed() { | ||
for _, subscription := range subscriptions { | ||
subscription.Unsubscribe() | ||
} | ||
subscriptions = []*gethrpc.ClientSubscription{} | ||
return | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
}() | ||
|
||
// Send all logs from common channel to user (via userConn) | ||
go func() { | ||
for idAndLog := range commonChannel { | ||
if userConn.IsClosed() { | ||
// Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine | ||
s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) | ||
return | ||
} | ||
|
||
jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) | ||
if err != nil { | ||
s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) | ||
continue | ||
} | ||
|
||
err = userConn.WriteResponse(jsonResponse) | ||
if err != nil { | ||
s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) | ||
continue | ||
} | ||
} | ||
}() | ||
|
||
// loop over all clients and create a new subscription for each of them | ||
s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) | ||
for _, client := range clients { | ||
s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) | ||
subscription, err := s.addSubscription(client, req, resp, commonChannel) | ||
if err != nil { | ||
s.logger.Info(fmt.Sprintf("Error: %v", err)) | ||
} | ||
subscriptions = append(subscriptions, subscription) | ||
// TODO: get subscription ID | ||
// s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) | ||
} | ||
return nil | ||
} | ||
|
||
func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) { | ||
s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) | ||
|
||
// Subscribe using the provided rpc.Client, all subscriptions have the same channel | ||
subscription, err := client.Subscribe(context.Background(), rpcResp, rpc.SubscribeNamespace, commonChannel, req.Params...) | ||
if err != nil { | ||
return nil, fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) | ||
} | ||
|
||
return subscription, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters