Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/main' into tudor/db…
Browse files Browse the repository at this point in the history
…_perf
  • Loading branch information
tudor-malene committed May 13, 2024
2 parents a209f99 + 6c12e16 commit cf50566
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 9 deletions.
6 changes: 5 additions & 1 deletion integration/networktest/actions/setup_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func (c *CreateTestUser) Run(ctx context.Context, network networktest.NetworkCon
if err != nil {
return ctx, fmt.Errorf("failed to get required gateway URL: %w", err)
}
user, err = userwallet.NewGatewayUser(wal, gwURL, logger)
gwWSURL, err := network.GetGatewayWSURL()
if err != nil {
return ctx, fmt.Errorf("failed to get required gateway WS URL: %w", err)
}
user, err = userwallet.NewGatewayUser(wal, gwURL, gwWSURL, logger)
if err != nil {
return ctx, fmt.Errorf("failed to create gateway user: %w", err)
}
Expand Down
74 changes: 74 additions & 0 deletions integration/networktest/actions/subscription_actions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package actions

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ten-protocol/go-ten/integration/networktest"
"github.com/ten-protocol/go-ten/integration/networktest/userwallet"
)

type recordNewHeadsSubscriptionAction struct {
duration time.Duration
gatewayUser int // -1 if not using gateway, else test user index to get gateway from

recordedHeads []*types.Header
}

func (r *recordNewHeadsSubscriptionAction) Run(ctx context.Context, network networktest.NetworkConnector) (context.Context, error) {
// get gateway address for first user
user, err := FetchTestUser(ctx, r.gatewayUser)
if err != nil {
return ctx, err
}
// verify user is a gateway user
gwUser, ok := user.(*userwallet.GatewayUser)
if !ok {
return ctx, fmt.Errorf("user is not a gateway user")
}
ethClient, err := gwUser.WSClient()
if err != nil {
return ctx, err
}
headsCh := make(chan *types.Header)
sub, err := ethClient.SubscribeNewHead(ctx, headsCh)
if err != nil {
return nil, err
}
startTime := time.Now()
fmt.Println("Listening for new heads")
// read from headsCh for duration or until subscription is closed
for time.Since(startTime) < r.duration {
select {
case head := <-headsCh:
// read and store head from headsCh, then continue listening if duration has not expired
fmt.Printf("Received new head: %v\n", head.Number)
r.recordedHeads = append(r.recordedHeads, head)
case <-time.After(500 * time.Millisecond):
// no new head received, continue listening if duration has not expired
case <-sub.Err():
// subscription closed
return ctx, fmt.Errorf("subscription closed unexpectedly")
case <-ctx.Done():
sub.Unsubscribe()
return ctx, fmt.Errorf("context cancelled")
}
}
sub.Unsubscribe()
return ctx, nil
}

func (r *recordNewHeadsSubscriptionAction) Verify(_ context.Context, _ networktest.NetworkConnector) error {
if len(r.recordedHeads) == 0 {
return fmt.Errorf("no new heads received during the %s period", r.duration)
}
return nil
}

func RecordNewHeadsSubscription(duration time.Duration) networktest.Action {
// for now this test expects a gateway user and tests via the gateway
// todo: add support for testing without a gateway (need to add newHeads subscription to ObsClient)
return &recordNewHeadsSubscriptionAction{duration: duration, gatewayUser: 0}
}
5 changes: 4 additions & 1 deletion integration/networktest/env/network_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func SepoliaTestnet(opts ...TestnetEnvOption) networktest.Environment {
[]string{"http://erpc.sepolia-testnet.ten.xyz:80"},
"http://sepolia-testnet-faucet.uksouth.azurecontainer.io/fund/eth",
"https://rpc.sepolia.org/",
"https://testnet.ten.xyz", // :81 for websocket
"https://testnet.ten.xyz",
"wss://testnet.ten.xyz:81",
)
return newTestnetEnv(connector, opts...)
}
Expand All @@ -36,6 +37,7 @@ func UATTestnet(opts ...TestnetEnvOption) networktest.Environment {
"http://uat-testnet-faucet.uksouth.azurecontainer.io/fund/eth",
"ws://uat-testnet-eth2network.uksouth.cloudapp.azure.com:9000",
"https://uat-testnet.ten.xyz",
"wss://uat-testnet.ten.xyz:81",
)
return newTestnetEnv(connector, opts...)
}
Expand All @@ -47,6 +49,7 @@ func DevTestnet(opts ...TestnetEnvOption) networktest.Environment {
"http://dev-testnet-faucet.uksouth.azurecontainer.io/fund/eth",
"ws://dev-testnet-eth2network.uksouth.cloudapp.azure.com:9000",
"https://dev-testnet.ten.xyz",
"wss://dev-testnet.ten.xyz:81",
)
return newTestnetEnv(connector, opts...)
}
Expand Down
8 changes: 7 additions & 1 deletion integration/networktest/env/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ type testnetConnector struct {
faucetHTTPAddress string
l1RPCURL string
tenGatewayURL string
tenGatewayWSURL string
faucetWallet userwallet.User
}

func newTestnetConnector(seqRPCAddr string, validatorRPCAddressses []string, faucetHTTPAddress string, l1WSURL string, tenGatewayURL string) *testnetConnector {
func newTestnetConnector(seqRPCAddr string, validatorRPCAddressses []string, faucetHTTPAddress string, l1WSURL string, tenGatewayURL string, tenGatewayWSURL string) *testnetConnector {
return &testnetConnector{
seqRPCAddress: seqRPCAddr,
validatorRPCAddresses: validatorRPCAddressses,
faucetHTTPAddress: faucetHTTPAddress,
l1RPCURL: l1WSURL,
tenGatewayURL: tenGatewayURL,
tenGatewayWSURL: tenGatewayWSURL,
}
}

Expand Down Expand Up @@ -146,3 +148,7 @@ func (t *testnetConnector) GetGatewayClient() (ethadapter.EthClient, error) {
func (t *testnetConnector) GetGatewayURL() (string, error) {
return t.tenGatewayURL, nil
}

func (t *testnetConnector) GetGatewayWSURL() (string, error) {
return t.tenGatewayWSURL, nil
}
1 change: 1 addition & 0 deletions integration/networktest/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type NetworkConnector interface {
GetL1Client() (ethadapter.EthClient, error)
GetMCOwnerWallet() (wallet.Wallet, error) // wallet that owns the management contract (network admin)
GetGatewayURL() (string, error)
GetGatewayWSURL() (string, error)
}

// Action is any step in a test, they will typically be either minimally small steps in the test or they will be containers
Expand Down
28 changes: 28 additions & 0 deletions integration/networktest/tests/subscription/subscriptions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package subscription

import (
"testing"
"time"

"github.com/ten-protocol/go-ten/integration/networktest"
"github.com/ten-protocol/go-ten/integration/networktest/actions"
"github.com/ten-protocol/go-ten/integration/networktest/env"
"github.com/ten-protocol/go-ten/integration/simulation/devnetwork"
)

func TestGatewayNewHeadsSubscription(t *testing.T) {
networktest.TestOnlyRunsInIDE(t)
networktest.Run(
"gateway-new-heads-subscription",
t,
env.LocalDevNetwork(devnetwork.WithGateway()),
actions.Series(
// user not technically needed, but we need a gateway address to use
&actions.CreateTestUser{UserID: 0, UseGateway: true},
actions.SetContextValue(actions.KeyNumberOfTestUsers, 1),

// Record new heads for specified duration, verify that the subscription is working
actions.RecordNewHeadsSubscription(20*time.Second),
),
)
}
20 changes: 16 additions & 4 deletions integration/networktest/userwallet/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
type GatewayUser struct {
wal wallet.Wallet

gwLib *lib.TGLib // TenGateway utility
client *ethclient.Client
gwLib *lib.TGLib // TenGateway utility
client *ethclient.Client
wsClient *ethclient.Client // lazily initialized websocket client

// state managed by the wallet
nonce uint64

logger gethlog.Logger
}

func NewGatewayUser(wal wallet.Wallet, gatewayURL string, logger gethlog.Logger) (*GatewayUser, error) {
gwLib := lib.NewTenGatewayLibrary(gatewayURL, "") // not providing wsURL for now, add if we need it
func NewGatewayUser(wal wallet.Wallet, gatewayURL string, gatewayWSURL string, logger gethlog.Logger) (*GatewayUser, error) {
gwLib := lib.NewTenGatewayLibrary(gatewayURL, gatewayWSURL)

err := gwLib.Join()
if err != nil {
Expand Down Expand Up @@ -112,3 +113,14 @@ func (g *GatewayUser) NativeBalance(ctx context.Context) (*big.Int, error) {
func (g *GatewayUser) Wallet() wallet.Wallet {
return g.wal
}

func (g *GatewayUser) WSClient() (*ethclient.Client, error) {
if g.wsClient == nil {
var err error
g.wsClient, err = ethclient.Dial(g.gwLib.WS())
if err != nil {
return nil, fmt.Errorf("failed to dial TenGateway WS: %w", err)
}
}
return g.wsClient, nil
}
12 changes: 10 additions & 2 deletions integration/simulation/devnetwork/dev_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (s *InMemDevNetwork) GetGatewayURL() (string, error) {
return fmt.Sprintf("http://localhost:%d", _gwHTTPPort), nil
}

func (s *InMemDevNetwork) GetGatewayWSURL() (string, error) {
if !s.tenConfig.TenGatewayEnabled {
return "", fmt.Errorf("ten gateway not enabled")
}
return fmt.Sprintf("ws://localhost:%d", _gwWSPort), nil
}

func (s *InMemDevNetwork) GetMCOwnerWallet() (wallet.Wallet, error) {
return s.networkWallets.MCOwnerWallet, nil
}
Expand Down Expand Up @@ -147,11 +154,12 @@ func (s *InMemDevNetwork) Start() {
fmt.Println("Starting obscuro nodes")
s.startNodes()

// sleep to allow the nodes to start
time.Sleep(10 * time.Second)

if s.tenConfig.TenGatewayEnabled {
s.startTenGateway()
}
// sleep to allow the nodes to start
time.Sleep(10 * time.Second)
}

func (s *InMemDevNetwork) GetGatewayClient() (ethadapter.EthClient, error) {
Expand Down

0 comments on commit cf50566

Please sign in to comment.