From 814037b09e6854cf865ce165358e6f270ca95eba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Kokelj?= Date: Wed, 27 Sep 2023 12:44:04 +0200 Subject: [PATCH] added handling subscriptionIDs --- go/rpc/client.go | 1 + .../accountmanager/account_manager.go | 7 ++++ .../subscriptions/subscriptions.go | 39 ++++++++++++++++--- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/go/rpc/client.go b/go/rpc/client.go index d5ba448a12..768ffaa126 100644 --- a/go/rpc/client.go +++ b/go/rpc/client.go @@ -34,6 +34,7 @@ const ( Attestation = "obscuroscan_attestation" StopHost = "test_stopHost" Subscribe = "eth_subscribe" + Unsubscribe = "eth_unsubscribe" SubscribeNamespace = "eth" SubscriptionTypeLogs = "logs" diff --git a/tools/walletextension/accountmanager/account_manager.go b/tools/walletextension/accountmanager/account_manager.go index cdcbae3377..c2ab9c14fa 100644 --- a/tools/walletextension/accountmanager/account_manager.go +++ b/tools/walletextension/accountmanager/account_manager.go @@ -77,6 +77,13 @@ func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *inte } } + // In some cases, we subscribe with each account current user added, and we also need to unsubscribe with all of them + if rpcReq.Method == rpc.Unsubscribe { + // TODO: The unsubscribe param is hash of concatenated subscriptions. + // We need to iterate over them and unsubscribe from all subscriptions + fmt.Println("we need to handle unsubscribe here") + } + return m.executeCall(rpcReq, rpcResp) } diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go index 150055a606..8ee21ce61a 100644 --- a/tools/walletextension/subscriptions/subscriptions.go +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -3,8 +3,11 @@ package subscriptions import ( "context" "fmt" + "strings" "time" + "github.com/ethereum/go-ethereum/crypto" + gethlog "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/obscuronet/go-obscuro/go/common" @@ -34,11 +37,10 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * // save subscriptions subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) + subscriptionIDS := make([]string, 0, len(clients)) // TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs - // TODO: Create response... - // Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user go func() { for { @@ -80,14 +82,39 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) for _, client := range clients { s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) - subscription, err := s.addSubscription(client, req, resp, commonChannel) + var subscriptionID interface{} + subscription, err := s.addSubscription(client, req, &subscriptionID, commonChannel) if err != nil { s.logger.Info(fmt.Sprintf("Error: %v", err)) + } else { + // If there was no error, the subscription was successful. Store it for unsubscribing in the future + subscriptions = append(subscriptions, subscription) + + // check if subscriptionID is of type string + if strSubscriptionID, ok := subscriptionID.(string); ok { + subscriptionIDS = append(subscriptionIDS, strSubscriptionID) + s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) + } else { + s.logger.Info(fmt.Sprintf("Error: invalid type of subscriptionID. Expected string, got: %T", subscriptionID)) + } } - subscriptions = append(subscriptions, subscription) - // TODO: get subscription ID - // s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) } + + // create a response with new subscriptionID by concatenating them and computing hash of the concatenated string + combinedSubscriptionIDS := strings.Join(subscriptionIDS, "") + // Compute Keccak-256 hash + subscriptionsIDHash := crypto.Keccak256([]byte(combinedSubscriptionIDS)) + // Convert hash to hex string for better readability + *resp = fmt.Sprintf("%x", subscriptionsIDHash) + + // TODO: + // We need to store subscriptionsIDHash and subscriptionIDS and have them available for unsubscribe - + // where is the best place to store them? + // - 1. option is database -> More complicated, + // can contain elements that are not relevant anymore in case of crashes, etd. + // - 2. option is in-memory storage, it si simpler, but will consume more RAM, + // easier to handle since on every crash/restart it is cleared (and also all the subscriptions are dropped by our logic) + return nil }