Skip to content

Commit

Permalink
Subscribe to events with multiple accounts and deduplicate (#1627)
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj authored Nov 6, 2023
1 parent 5c3fc3c commit 1d4e8d7
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 138 deletions.
2 changes: 1 addition & 1 deletion integration/obscurogateway/obscurogateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func testMultipleAccountsSubscription(t *testing.T, httpURL, wsURL string, w wal
// user1 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage
assert.Equal(t, 3, len(user1logs))
// user2 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage
assert.Equal(t, 2, len(user2logs))
assert.Equal(t, 3, len(user2logs))
}

func testAreTxsMinted(t *testing.T, httpURL, wsURL string, w wallet.Wallet) { //nolint: unused
Expand Down
22 changes: 0 additions & 22 deletions tools/walletextension/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"fmt"
"regexp"

"github.com/go-kit/kit/transport/http/jsonrpc"
"github.com/obscuronet/go-obscuro/go/common"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/obscuronet/go-obscuro/go/common/viewingkey"
Expand Down Expand Up @@ -110,22 +107,3 @@ func (r *RPCRequest) Clone() *RPCRequest {
Params: r.Params,
}
}

// Formats the log to be sent as an Eth JSON-RPC response.
// TODO (@ziga) - Move this code to a subscriptions package once it is used only there..
func PrepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[JSONKeySubscription] = idAndLog.SubID
paramsMap[JSONKeyResult] = idAndLog.Log

respMap := make(map[string]interface{})
respMap[JSONKeyRPCVersion] = jsonrpc.Version
respMap[JSONKeyMethod] = methodEthSubscription
respMap[JSONKeyParams] = paramsMap

jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}
3 changes: 2 additions & 1 deletion tools/walletextension/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ const (
GetStorageAtUserIDRequestMethodName = "getUserID"
SuccessMsg = "success"
APIVersion1 = "/v1"
methodEthSubscription = "eth_subscription"
MethodEthSubscription = "eth_subscription"
PathVersion = "/version/"
DeduplicationBufferSize = 20
)

var ReaderHeadTimeout = 10 * time.Second
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package subscriptions

import "github.com/ethereum/go-ethereum/common"

// LogKey uniquely represents a log (consists of BlockHash, TxHash, and Index)
type LogKey struct {
BlockHash common.Hash // Not necessary, but can be helpful in edge case of block reorg.
TxHash common.Hash
Index uint
}

// CircularBuffer is a data structure that uses a single, fixed-size buffer as if it was connected end-to-end.
type CircularBuffer struct {
data []LogKey
size int
end int
}

// NewCircularBuffer initializes a new CircularBuffer of the given size.
func NewCircularBuffer(size int) *CircularBuffer {
return &CircularBuffer{
data: make([]LogKey, size),
size: size,
end: 0,
}
}

// Push adds a new LogKey to the end of the buffer. If the buffer is full,
// it overwrites the oldest data with the new LogKey.
func (cb *CircularBuffer) Push(key LogKey) {
cb.data[cb.end] = key
cb.end = (cb.end + 1) % cb.size
}

// Contains checks if the given LogKey exists in the buffer
func (cb *CircularBuffer) Contains(key LogKey) bool {
for _, item := range cb.data {
if item == key {
return true
}
}
return false
}
69 changes: 56 additions & 13 deletions tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package subscriptions

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/transport/http/jsonrpc"

gethlog "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/obscuronet/go-obscuro/go/common"
Expand All @@ -17,6 +21,7 @@ import (
type SubscriptionManager struct {
subscriptionMappings map[string][]string
logger gethlog.Logger
mu sync.Mutex
}

func New(logger gethlog.Logger) *SubscriptionManager {
Expand All @@ -35,14 +40,16 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req

sm.logger.Info(fmt.Sprintf("Subscribing to event %s with %d clients", req.Params, len(clients)))

// create subscriptionID which will enable user to unsubscribe from all subscriptions
userSubscriptionID := gethrpc.NewID()

// create a common channel for subscriptions from all accounts
funnelMultipleAccountsChan := make(chan common.IDAndLog)

// read from a multiple accounts channel and write results to userConn
go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, sm.logger)
go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, userSubscriptionID, sm.logger)

// iterate over all clients and subscribe for each of them
// TODO: currently we use only first client (enabling subscriptions for all of them will be part of future PR)
for _, client := range clients {
subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...)
if err != nil {
Expand All @@ -53,29 +60,43 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req
// TODO: test this feature in integration test
go checkIfUserConnIsClosedAndUnsubscribe(userConn, subscription)

// Add map subscriptionIDs
// Make a connection between subscriptionID returned from node for current request and subscriptionID returned to user
if currentNodeSubscriptionID, ok := (*resp).(string); ok {
// TODO (@ziga): Currently we use the same value for node and user subscriptionID - this will change after
// subscribing with multiple accounts
sm.UpdateSubscriptionMapping(currentNodeSubscriptionID, currentNodeSubscriptionID)

return nil
// TODO (@ziga)
// At this stage we want to use only the first account - same as before
// introduce subscribing with all accounts in another PR )
sm.UpdateSubscriptionMapping(string(userSubscriptionID), currentNodeSubscriptionID)
} else {
sm.logger.Error("Unable to read subscriptionID")
}
}

// We return subscriptionID with resp interface. We want to use userSubscriptionID to allow unsubscribing
*resp = userSubscriptionID
return nil
}

func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, logger gethlog.Logger) {
func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, userSubscriptionID gethrpc.ID, logger gethlog.Logger) {
buffer := NewCircularBuffer(wecommon.DeduplicationBufferSize)
for data := range channel {
jsonResponse, err := wecommon.PrepareLogResponse(data)
// create unique identifier for current log
uniqueLogKey := LogKey{
BlockHash: data.Log.BlockHash,
TxHash: data.Log.TxHash,
Index: data.Log.Index,
}

// check if the current event is a duplicate (and skip it if it is)
if buffer.Contains(uniqueLogKey) {
continue
}

jsonResponse, err := prepareLogResponse(data, userSubscriptionID)
if err != nil {
logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err)
continue
}

// the current log is unique, and we want to add it to our buffer and proceed with forwarding to the user
buffer.Push(uniqueLogKey)

logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID)
err = userConn.WriteResponse(jsonResponse)
if err != nil {
Expand All @@ -96,6 +117,10 @@ func checkIfUserConnIsClosedAndUnsubscribe(userConn userconn.UserConn, subscript
}

func (sm *SubscriptionManager) UpdateSubscriptionMapping(userSubscriptionID string, obscuroNodeSubscriptionID string) {
// Ensure there is no concurrent map writes
sm.mu.Lock()
defer sm.mu.Unlock()

existingUserIDs, exists := sm.subscriptionMappings[userSubscriptionID]

if !exists {
Expand All @@ -116,3 +141,21 @@ func (sm *SubscriptionManager) UpdateSubscriptionMapping(userSubscriptionID stri
sm.subscriptionMappings[userSubscriptionID] = append(existingUserIDs, obscuroNodeSubscriptionID)
}
}

// Formats the log to be sent as an Eth JSON-RPC response.
func prepareLogResponse(idAndLog common.IDAndLog, userSubscriptionID gethrpc.ID) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[wecommon.JSONKeySubscription] = userSubscriptionID
paramsMap[wecommon.JSONKeyResult] = idAndLog.Log

respMap := make(map[string]interface{})
respMap[wecommon.JSONKeyRPCVersion] = jsonrpc.Version
respMap[wecommon.JSONKeyMethod] = wecommon.MethodEthSubscription
respMap[wecommon.JSONKeyParams] = paramsMap

jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}
99 changes: 48 additions & 51 deletions tools/walletextension/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"net/http"
"os"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -303,50 +302,50 @@ func issueRequestWS(conn *websocket.Conn, body []byte) []byte {
}

// Reads messages from the connection for the provided duration, and returns the read messages.
func readMessagesForDuration(t *testing.T, conn *websocket.Conn, duration time.Duration) [][]byte {
// We set a timeout to kill the test, in case we never receive a log.
timeout := time.AfterFunc(duration*3, func() {
t.Fatalf("timed out waiting to receive a log via the subscription")
})
defer timeout.Stop()

var msgs [][]byte
endTime := time.Now().Add(duration)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
t.Fatalf("could not read message from websocket. Cause: %s", err)
}
msgs = append(msgs, msg)
if time.Now().After(endTime) {
return msgs
}
}
}
//func readMessagesForDuration(t *testing.T, conn *websocket.Conn, duration time.Duration) [][]byte {
// // We set a timeout to kill the test, in case we never receive a log.
// timeout := time.AfterFunc(duration*3, func() {
// t.Fatalf("timed out waiting to receive a log via the subscription")
// })
// defer timeout.Stop()
//
// var msgs [][]byte
// endTime := time.Now().Add(duration)
// for {
// _, msg, err := conn.ReadMessage()
// if err != nil {
// t.Fatalf("could not read message from websocket. Cause: %s", err)
// }
// msgs = append(msgs, msg)
// if time.Now().After(endTime) {
// return msgs
// }
// }
//}

// Asserts that there are no duplicate logs in the provided list.
func assertNoDupeLogs(t *testing.T, logsJSON [][]byte) {
logCount := make(map[string]int)

for _, logJSON := range logsJSON {
// Check if the log is already in the logCount map.
_, exist := logCount[string(logJSON)]
if exist {
logCount[string(logJSON)]++ // If it is, increase the count for that log by one.
} else {
logCount[string(logJSON)] = 1 // Otherwise, start a count for that log starting at one.
}
}

for logJSON, count := range logCount {
if count > 1 {
t.Errorf("received duplicate log with body %s", logJSON)
}
}
}
//func assertNoDupeLogs(t *testing.T, logsJSON [][]byte) {
// logCount := make(map[string]int)
//
// for _, logJSON := range logsJSON {
// // Check if the log is already in the logCount map.
// _, exist := logCount[string(logJSON)]
// if exist {
// logCount[string(logJSON)]++ // If it is, increase the count for that log by one.
// } else {
// logCount[string(logJSON)] = 1 // Otherwise, start a count for that log starting at one.
// }
// }
//
// for logJSON, count := range logCount {
// if count > 1 {
// t.Errorf("received duplicate log with body %s", logJSON)
// }
// }
//}

// Checks that the response to a request is correctly formatted, and returns the result field.
func validateJSONResponse(t *testing.T, resp []byte) interface{} {
func validateJSONResponse(t *testing.T, resp []byte) {
var respJSON map[string]interface{}
err := json.Unmarshal(resp, &respJSON)
if err != nil {
Expand All @@ -366,16 +365,14 @@ func validateJSONResponse(t *testing.T, resp []byte) interface{} {
if result == nil {
t.Fatalf("response did not contain `result` field")
}

return result
}

// Checks that the response to a subscription request is correctly formatted.
func validateSubscriptionResponse(t *testing.T, resp []byte) {
result := validateJSONResponse(t, resp)
pattern := "0x.*"
resultString, ok := result.(string)
if !ok || !regexp.MustCompile(pattern).MatchString(resultString) {
t.Fatalf("subscription response did not contain expected result. Expected pattern matching %s, got %s", pattern, resultString)
}
}
//func validateSubscriptionResponse(t *testing.T, resp []byte) {
// result := validateJSONResponse(t, resp)
// pattern := "0x.*"
// resultString, ok := result.(string)
// if !ok || !regexp.MustCompile(pattern).MatchString(resultString) {
// t.Fatalf("subscription response did not contain expected result. Expected pattern matching %s, got %s", pattern, resultString)
// }
//}
Loading

0 comments on commit 1d4e8d7

Please sign in to comment.