From e877138f3d00cf15e13ae041c539b6f992086f9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=BDiga=20Kokelj?= Date: Fri, 20 Oct 2023 14:40:53 +0200 Subject: [PATCH] pair_programming_code --- .../obscurogateway/obscurogateway_test.go | 7 +- .../accountmanager/account_manager.go | 124 +++++----- .../subscriptions/subscriptions.go | 218 ++++++++++++------ 3 files changed, 218 insertions(+), 131 deletions(-) diff --git a/integration/obscurogateway/obscurogateway_test.go b/integration/obscurogateway/obscurogateway_test.go index 6b4cb65d67..afd752d650 100644 --- a/integration/obscurogateway/obscurogateway_test.go +++ b/integration/obscurogateway/obscurogateway_test.go @@ -345,6 +345,7 @@ func TestObscuroGatewaySubscriptionsWithMultipleAccounts(t *testing.T) { fmt.Printf("Tx1 was included in block %d\n", intTx1Receipt.BlockNumber) fmt.Printf("Tx2 was included in block %d\n", intTx2Receipt.BlockNumber) + time.Sleep(30 * time.Second) fmt.Println("user0 received logs: ", len(user0logs)) fmt.Println("user1 received logs: ", len(user1logs)) fmt.Println("user2 received logs: ", len(user2logs)) @@ -360,6 +361,7 @@ func TestObscuroGatewaySubscriptionsWithMultipleAccounts(t *testing.T) { // FIXME: Currently we receive only 2 events, because only the first account on each client actually subscribe // assert.Equal(t, 3, len(user2logs)) + time.Sleep(time.Hour) // Gracefully shutdown err = obscuroGwContainer.Stop() assert.NoError(t, err) @@ -619,9 +621,10 @@ func subscribeToEvents(addresses []gethcommon.Address, topics [][]gethcommon.Has case err := <-subscription.Err(): fmt.Printf("Error from logs subscription: %v\n", err) return - case log := <-logsCh: + case log2 := <-logsCh: + fmt.Println(log2) // append logs to be visible from the main thread - *logs = append(*logs, log) + *logs = append(*logs, log2) } } }() diff --git a/tools/walletextension/accountmanager/account_manager.go b/tools/walletextension/accountmanager/account_manager.go index 87333e9d3c..cf071e8588 100644 --- a/tools/walletextension/accountmanager/account_manager.go +++ b/tools/walletextension/accountmanager/account_manager.go @@ -1,12 +1,14 @@ package accountmanager import ( + "context" "encoding/json" "errors" "fmt" - "strings" - + "github.com/obscuronet/go-obscuro/go/common/log" "github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions" + "strings" + "time" "github.com/ethereum/go-ethereum/eth/filters" @@ -70,7 +72,7 @@ func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *inte } return err - //// fetch the clients from a topic (todo: @ziga - delete it) + // fetch the clients from a topic (todo: @ziga - delete it) //for _, client := range clients { // return m.executeSubscribe(client, rpcReq, rpcResp, userConn) //} @@ -279,64 +281,64 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map return nil, fmt.Errorf("no known account found in data bytes") } -//func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit -// if len(req.Params) == 0 { -// return fmt.Errorf("could not subscribe as no subscription namespace was provided") -// } -// m.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) -// ch := make(chan common.IDAndLog) -// subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, ch, req.Params...) -// if err != nil { -// return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) -// } -// -// // We listen for incoming messages on the subscription. -// go func() { -// for { -// select { -// case idAndLog := <-ch: -// if userConn.IsClosed() { -// m.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) -// return -// } -// -// jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) -// if err != nil { -// m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) -// continue -// } -// -// m.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, idAndLog.SubID) -// err = userConn.WriteResponse(jsonResponse) -// if err != nil { -// m.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) -// continue -// } -// -// case err = <-subscription.Err(): -// // An error on this channel means the subscription has ended, so we exit the loop. -// if userConn != nil && err != nil { -// userConn.HandleError(err.Error()) -// } -// -// return -// } -// } -// }() -// -// // We periodically check if the websocket is closed, and terminate the subscription. -// go func() { -// for { -// if userConn.IsClosed() { -// subscription.Unsubscribe() -// return -// } -// time.Sleep(100 * time.Millisecond) -// } -// }() -// -// return nil -//} +func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit + if len(req.Params) == 0 { + return fmt.Errorf("could not subscribe as no subscription namespace was provided") + } + m.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req)) + ch := make(chan common.IDAndLog) + subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, ch, req.Params...) + if err != nil { + return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) + } + + // We listen for incoming messages on the subscription. + go func() { + for { + select { + case idAndLog := <-ch: + if userConn.IsClosed() { + m.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) + return + } + + jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) + if err != nil { + m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + continue + } + + m.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, idAndLog.SubID) + err = userConn.WriteResponse(jsonResponse) + if err != nil { + m.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + continue + } + + case err = <-subscription.Err(): + // An error on this channel means the subscription has ended, so we exit the loop. + if userConn != nil && err != nil { + userConn.HandleError(err.Error()) + } + + return + } + } + }() + + // We periodically check if the websocket is closed, and terminate the subscription. + go func() { + for { + if userConn.IsClosed() { + subscription.Unsubscribe() + return + } + time.Sleep(100 * time.Millisecond) + } + }() + + return nil +} func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error { if req.Method == rpc.Call || req.Method == rpc.EstimateGas { diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go index 6a342d5552..059a8e37d5 100644 --- a/tools/walletextension/subscriptions/subscriptions.go +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -3,11 +3,8 @@ package subscriptions import ( "context" "fmt" - "strings" "time" - "github.com/ethereum/go-ethereum/crypto" - gethlog "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/obscuronet/go-obscuro/go/common" @@ -33,87 +30,172 @@ func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req * if len(req.Params) == 0 { return fmt.Errorf("could not subscribe as no subscription namespace was provided") } + s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", clients[0], req)) + //ch := make(chan common.IDAndLog) - // create a chanel that will collect the data from all subscriptions - commonChannel := make(chan common.IDAndLog) - - // save subscriptions - subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) - subscriptionIDS := make([]string, 0, len(clients)) + funnelMultipleAccountsChan := make(chan common.IDAndLog) // weary of 1 msg at a time - // TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs - - // Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user go func() { for { - if userConn.IsClosed() { - for _, subscription := range subscriptions { - subscription.Unsubscribe() + select { + case data := <-funnelMultipleAccountsChan: + jsonResponse, err := wecommon.PrepareLogResponse(data) + if err != nil { + s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err) + continue } - subscriptions = []*gethrpc.ClientSubscription{} - return - } - time.Sleep(100 * time.Millisecond) - } - }() - // Send all logs from common channel to user (via userConn) - go func() { - for idAndLog := range commonChannel { - if userConn.IsClosed() { - // Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine - s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) - return - } + s.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID) + err = userConn.WriteResponse(jsonResponse) + if err != nil { + s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, data.SubID, log.ErrKey, err) + continue + } - jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) - fmt.Println("We have a log: ", string(jsonResponse)) - if err != nil { - s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) - continue - } - err = userConn.WriteResponse(jsonResponse) - if err != nil { - s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) - continue } } }() - // loop over all clients and create a new subscription for each of them - s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) for _, client := range clients { - s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) - fmt.Println("Subscribing to logs with client: ", client) - var subscriptionID interface{} - subscription, err := s.addSubscription(client, req, &subscriptionID, commonChannel) - strSubscriptionID, isOK := subscriptionID.(string) - if err != nil || !isOK { - s.logger.Info(fmt.Sprintf("Error subscribing: %v", err)) - continue + subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...) + if err != nil { + return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err) } - // If there was no error, the subscription was successful. Store it for unsubscribing in the future - subscriptions = append(subscriptions, subscription) - subscriptionIDS = append(subscriptionIDS, strSubscriptionID) - s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) - } - // create a response with new subscriptionID by concatenating them and computing hash of the concatenated string - combinedSubscriptionIDS := strings.Join(subscriptionIDS, "") - // Compute Keccak-256 hash - subscriptionsIDHash := crypto.Keccak256([]byte(combinedSubscriptionIDS)) - // Convert hash to hex string for better readability - *resp = fmt.Sprintf("%x", subscriptionsIDHash) - - // TODO: - // We need to store subscriptionsIDHash and subscriptionIDS and have them available for unsubscribe - - // where is the best place to store them? - // - 1. option is database -> More complicated, - // can contain elements that are not relevant anymore in case of crashes, etd. - // - 2. option is in-memory storage, it si simpler, but will consume more RAM, - // easier to handle since on every crash/restart it is cleared (and also all the subscriptions are dropped by our logic) + // We listen for incoming messages on the subscription. + //go func() { + // for { + // select { + // case idAndLog := <-ch: + // if userConn.IsClosed() { + // s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) + // return + // } + // + // funnelMultipleAccountsChan <- idAndLog + // + // case err = <-subscription.Err(): + // // An error on this channel means the subscription has ended, so we exit the loop. + // if userConn != nil && err != nil { + // userConn.HandleError(err.Error()) + // } + // + // return + // } + // } + //}() + + // We periodically check if the websocket is closed, and terminate the subscription. + go func() { + for { + if userConn.IsClosed() { + subscription.Unsubscribe() + return + } + time.Sleep(100 * time.Millisecond) + } + }() + } return nil + // + //if len(req.Params) == 0 { + // return fmt.Errorf("could not subscribe as no subscription namespace was provided") + //} + // + //// create a chanel that will collect the data from all subscriptions + //commonChannel := make(chan common.IDAndLog) + // + //// save subscriptions + //subscriptions := make([]*gethrpc.ClientSubscription, 0, len(clients)) + //subscriptionIDS := make([]string, 0, len(clients)) + // + //// TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs + // + //// Do periodic checks if userConn is closed and unsubscribe from all subscriptions for this user + //go func() { + // for { + // if userConn.IsClosed() { + // for _, subscription := range subscriptions { + // subscription.Unsubscribe() + // } + // subscriptions = []*gethrpc.ClientSubscription{} + // return + // } + // time.Sleep(100 * time.Millisecond) + // } + //}() + // + //// Send all logs from common channel to user (via userConn) + //go func() { + // for { + // select { + // case idAndLog := <-commonChannel: + // if userConn.IsClosed() { + // // Log that websocket was closed - unsubscribing is handled by periodic checks in separate goroutine + // s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID) + // return + // } + // + // jsonResponse, err := wecommon.PrepareLogResponse(idAndLog) + // fmt.Println("We have a log: ", string(jsonResponse)) + // if err != nil { + // s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + // continue + // } + // err = userConn.WriteResponse(jsonResponse) + // if err != nil { + // s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err) + // continue + // } + // } + // } + //}() + // + //// loop over all clients and create a new subscription for each of them + //s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients))) + //for _, client := range clients { + // s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client)) + // fmt.Println("Subscribing to logs with client: ", client) + // var subscriptionID interface{} + // subChannel := make(chan common.IDAndLog) + // subscription, err := s.addSubscription(client, req, &subscriptionID, subChannel) + // strSubscriptionID, isOK := subscriptionID.(string) + // if err != nil || !isOK { + // s.logger.Info(fmt.Sprintf("Error subscribing: %v", err)) + // continue + // } + // // If there was no error, the subscription was successful. Store it for unsubscribing in the future + // subscriptions = append(subscriptions, subscription) + // subscriptionIDS = append(subscriptionIDS, strSubscriptionID) + // s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID)) + // + // go func() { + // for { + // select { + // case data := <-subChannel: + // commonChannel <- data + // } + // } + // }() + //} + // + //// create a response with new subscriptionID by concatenating them and computing hash of the concatenated string + //combinedSubscriptionIDS := strings.Join(subscriptionIDS, "") + //// Compute Keccak-256 hash + //subscriptionsIDHash := crypto.Keccak256([]byte(combinedSubscriptionIDS)) + //// Convert hash to hex string for better readability + //*resp = fmt.Sprintf("%x", subscriptionsIDHash) + // + //// TODO: + //// We need to store subscriptionsIDHash and subscriptionIDS and have them available for unsubscribe - + //// where is the best place to store them? + //// - 1. option is database -> More complicated, + //// can contain elements that are not relevant anymore in case of crashes, etd. + //// - 2. option is in-memory storage, it si simpler, but will consume more RAM, + //// easier to handle since on every crash/restart it is cleared (and also all the subscriptions are dropped by our logic) + // + //return nil } func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, rpcResp *interface{}, commonChannel chan common.IDAndLog) (*gethrpc.ClientSubscription, error) {