Skip to content

Commit

Permalink
pair_programming_code
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj committed Oct 20, 2023
1 parent 206f45a commit e877138
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 131 deletions.
7 changes: 5 additions & 2 deletions integration/obscurogateway/obscurogateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func TestObscuroGatewaySubscriptionsWithMultipleAccounts(t *testing.T) {
fmt.Printf("Tx1 was included in block %d\n", intTx1Receipt.BlockNumber)
fmt.Printf("Tx2 was included in block %d\n", intTx2Receipt.BlockNumber)

time.Sleep(30 * time.Second)
fmt.Println("user0 received logs: ", len(user0logs))
fmt.Println("user1 received logs: ", len(user1logs))
fmt.Println("user2 received logs: ", len(user2logs))
Expand All @@ -360,6 +361,7 @@ func TestObscuroGatewaySubscriptionsWithMultipleAccounts(t *testing.T) {
// FIXME: Currently we receive only 2 events, because only the first account on each client actually subscribe
// assert.Equal(t, 3, len(user2logs))

time.Sleep(time.Hour)
// Gracefully shutdown
err = obscuroGwContainer.Stop()
assert.NoError(t, err)
Expand Down Expand Up @@ -619,9 +621,10 @@ func subscribeToEvents(addresses []gethcommon.Address, topics [][]gethcommon.Has
case err := <-subscription.Err():
fmt.Printf("Error from logs subscription: %v\n", err)
return
case log := <-logsCh:
case log2 := <-logsCh:
fmt.Println(log2)
// append logs to be visible from the main thread
*logs = append(*logs, log)
*logs = append(*logs, log2)
}
}
}()
Expand Down
124 changes: 63 additions & 61 deletions tools/walletextension/accountmanager/account_manager.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package accountmanager

import (
"context"
"encoding/json"
"errors"
"fmt"

Check failure on line 7 in tools/walletextension/accountmanager/account_manager.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
"strings"

"github.com/obscuronet/go-obscuro/go/common/log"

Check failure on line 8 in tools/walletextension/accountmanager/account_manager.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
"github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions"
"strings"
"time"

"github.com/ethereum/go-ethereum/eth/filters"

Expand Down Expand Up @@ -70,7 +72,7 @@ func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *inte
}
return err

//// fetch the clients from a topic (todo: @ziga - delete it)
// fetch the clients from a topic (todo: @ziga - delete it)
//for _, client := range clients {
// return m.executeSubscribe(client, rpcReq, rpcResp, userConn)
//}
Expand Down Expand Up @@ -279,64 +281,64 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map
return nil, fmt.Errorf("no known account found in data bytes")
}

//func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit
// if len(req.Params) == 0 {
// return fmt.Errorf("could not subscribe as no subscription namespace was provided")
// }
// m.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req))
// ch := make(chan common.IDAndLog)
// subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, ch, req.Params...)
// if err != nil {
// return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err)
// }
//
// // We listen for incoming messages on the subscription.
// go func() {
// for {
// select {
// case idAndLog := <-ch:
// if userConn.IsClosed() {
// m.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID)
// return
// }
//
// jsonResponse, err := wecommon.PrepareLogResponse(idAndLog)
// if err != nil {
// m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
// continue
// }
//
// m.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, idAndLog.SubID)
// err = userConn.WriteResponse(jsonResponse)
// if err != nil {
// m.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
// continue
// }
//
// case err = <-subscription.Err():
// // An error on this channel means the subscription has ended, so we exit the loop.
// if userConn != nil && err != nil {
// userConn.HandleError(err.Error())
// }
//
// return
// }
// }
// }()
//
// // We periodically check if the websocket is closed, and terminate the subscription.
// go func() {
// for {
// if userConn.IsClosed() {
// subscription.Unsubscribe()
// return
// }
// time.Sleep(100 * time.Millisecond)
// }
// }()
//
// return nil
//}
func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit

Check failure on line 284 in tools/walletextension/accountmanager/account_manager.go

View workflow job for this annotation

GitHub Actions / lint

func `(*AccountManager).executeSubscribe` is unused (unused)
if len(req.Params) == 0 {
return fmt.Errorf("could not subscribe as no subscription namespace was provided")
}
m.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req))
ch := make(chan common.IDAndLog)
subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, ch, req.Params...)
if err != nil {
return fmt.Errorf("could not call %s with params %v. Cause: %w", req.Method, req.Params, err)
}

// We listen for incoming messages on the subscription.
go func() {
for {
select {
case idAndLog := <-ch:
if userConn.IsClosed() {
m.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID)
return
}

jsonResponse, err := wecommon.PrepareLogResponse(idAndLog)
if err != nil {
m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
}

m.logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, idAndLog.SubID)
err = userConn.WriteResponse(jsonResponse)
if err != nil {
m.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
}

case err = <-subscription.Err():
// An error on this channel means the subscription has ended, so we exit the loop.
if userConn != nil && err != nil {
userConn.HandleError(err.Error())
}

return
}
}
}()

// We periodically check if the websocket is closed, and terminate the subscription.
go func() {
for {
if userConn.IsClosed() {
subscription.Unsubscribe()
return
}
time.Sleep(100 * time.Millisecond)
}
}()

return nil
}

func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error {
if req.Method == rpc.Call || req.Method == rpc.EstimateGas {
Expand Down
Loading

0 comments on commit e877138

Please sign in to comment.