Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network tests: new heads subscriptions #1907

Merged
merged 2 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion integration/networktest/actions/setup_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func (c *CreateTestUser) Run(ctx context.Context, network networktest.NetworkCon
if err != nil {
return ctx, fmt.Errorf("failed to get required gateway URL: %w", err)
}
user, err = userwallet.NewGatewayUser(wal, gwURL, logger)
gwWSURL, err := network.GetGatewayWSURL()
if err != nil {
return ctx, fmt.Errorf("failed to get required gateway WS URL: %w", err)
}
user, err = userwallet.NewGatewayUser(wal, gwURL, gwWSURL, logger)
if err != nil {
return ctx, fmt.Errorf("failed to create gateway user: %w", err)
}
Expand Down
74 changes: 74 additions & 0 deletions integration/networktest/actions/subscription_actions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package actions

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ten-protocol/go-ten/integration/networktest"
"github.com/ten-protocol/go-ten/integration/networktest/userwallet"
)

type recordNewHeadsSubscriptionAction struct {
duration time.Duration
gatewayUser int // -1 if not using gateway, else test user index to get gateway from

recordedHeads []*types.Header
}

func (r *recordNewHeadsSubscriptionAction) Run(ctx context.Context, network networktest.NetworkConnector) (context.Context, error) {
// get gateway address for first user
user, err := FetchTestUser(ctx, r.gatewayUser)
if err != nil {
return ctx, err
}
// verify user is a gateway user
gwUser, ok := user.(*userwallet.GatewayUser)
if !ok {
return ctx, fmt.Errorf("user is not a gateway user")
}
ethClient, err := gwUser.WSClient()
if err != nil {
return ctx, err
}
headsCh := make(chan *types.Header)
sub, err := ethClient.SubscribeNewHead(ctx, headsCh)
if err != nil {
return nil, err
}
startTime := time.Now()
fmt.Println("Listening for new heads")
// read from headsCh for duration or until subscription is closed
for time.Since(startTime) < r.duration {
select {
case head := <-headsCh:
// read and store head from headsCh, then continue listening if duration has not expired
fmt.Printf("Received new head: %v\n", head.Number)
r.recordedHeads = append(r.recordedHeads, head)
case <-time.After(500 * time.Millisecond):
// no new head received, continue listening if duration has not expired
case <-sub.Err():
// subscription closed
return ctx, fmt.Errorf("subscription closed unexpectedly")
case <-ctx.Done():
sub.Unsubscribe()
return ctx, fmt.Errorf("context cancelled")
}
}
sub.Unsubscribe()
return ctx, nil
}

func (r *recordNewHeadsSubscriptionAction) Verify(_ context.Context, _ networktest.NetworkConnector) error {
if len(r.recordedHeads) == 0 {
return fmt.Errorf("no new heads received during the %s period", r.duration)
}
return nil
}

func RecordNewHeadsSubscription(duration time.Duration) networktest.Action {
// for now this test expects a gateway user and tests via the gateway
// todo: add support for testing without a gateway (need to add newHeads subscription to ObsClient)
return &recordNewHeadsSubscriptionAction{duration: duration, gatewayUser: 0}
}
5 changes: 4 additions & 1 deletion integration/networktest/env/network_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func SepoliaTestnet(opts ...TestnetEnvOption) networktest.Environment {
[]string{"http://erpc.sepolia-testnet.ten.xyz:80"},
"http://sepolia-testnet-faucet.uksouth.azurecontainer.io/fund/eth",
"https://rpc.sepolia.org/",
"https://testnet.ten.xyz", // :81 for websocket
"https://testnet.ten.xyz",
"wss://testnet.ten.xyz:81",
)
return newTestnetEnv(connector, opts...)
}
Expand All @@ -36,6 +37,7 @@ func UATTestnet(opts ...TestnetEnvOption) networktest.Environment {
"http://uat-testnet-faucet.uksouth.azurecontainer.io/fund/eth",
"ws://uat-testnet-eth2network.uksouth.cloudapp.azure.com:9000",
"https://uat-testnet.ten.xyz",
"wss://uat-testnet.ten.xyz:81",
)
return newTestnetEnv(connector, opts...)
}
Expand All @@ -47,6 +49,7 @@ func DevTestnet(opts ...TestnetEnvOption) networktest.Environment {
"http://dev-testnet-faucet.uksouth.azurecontainer.io/fund/eth",
"ws://dev-testnet-eth2network.uksouth.cloudapp.azure.com:9000",
"https://dev-testnet.ten.xyz",
"wss://dev-testnet.ten.xyz:81",
)
return newTestnetEnv(connector, opts...)
}
Expand Down
8 changes: 7 additions & 1 deletion integration/networktest/env/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ type testnetConnector struct {
faucetHTTPAddress string
l1RPCURL string
tenGatewayURL string
tenGatewayWSURL string
faucetWallet userwallet.User
}

func newTestnetConnector(seqRPCAddr string, validatorRPCAddressses []string, faucetHTTPAddress string, l1WSURL string, tenGatewayURL string) *testnetConnector {
func newTestnetConnector(seqRPCAddr string, validatorRPCAddressses []string, faucetHTTPAddress string, l1WSURL string, tenGatewayURL string, tenGatewayWSURL string) *testnetConnector {
return &testnetConnector{
seqRPCAddress: seqRPCAddr,
validatorRPCAddresses: validatorRPCAddressses,
faucetHTTPAddress: faucetHTTPAddress,
l1RPCURL: l1WSURL,
tenGatewayURL: tenGatewayURL,
tenGatewayWSURL: tenGatewayWSURL,
}
}

Expand Down Expand Up @@ -146,3 +148,7 @@ func (t *testnetConnector) GetGatewayClient() (ethadapter.EthClient, error) {
func (t *testnetConnector) GetGatewayURL() (string, error) {
return t.tenGatewayURL, nil
}

func (t *testnetConnector) GetGatewayWSURL() (string, error) {
return t.tenGatewayWSURL, nil
}
1 change: 1 addition & 0 deletions integration/networktest/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type NetworkConnector interface {
GetL1Client() (ethadapter.EthClient, error)
GetMCOwnerWallet() (wallet.Wallet, error) // wallet that owns the management contract (network admin)
GetGatewayURL() (string, error)
GetGatewayWSURL() (string, error)
}

// Action is any step in a test, they will typically be either minimally small steps in the test or they will be containers
Expand Down
28 changes: 28 additions & 0 deletions integration/networktest/tests/subscription/subscriptions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package subscription

import (
"testing"
"time"

"github.com/ten-protocol/go-ten/integration/networktest"
"github.com/ten-protocol/go-ten/integration/networktest/actions"
"github.com/ten-protocol/go-ten/integration/networktest/env"
"github.com/ten-protocol/go-ten/integration/simulation/devnetwork"
)

func TestGatewayNewHeadsSubscription(t *testing.T) {
networktest.TestOnlyRunsInIDE(t)
networktest.Run(
"gateway-new-heads-subscription",
t,
env.LocalDevNetwork(devnetwork.WithGateway()),
actions.Series(
// user not technically needed, but we need a gateway address to use
&actions.CreateTestUser{UserID: 0, UseGateway: true},
actions.SetContextValue(actions.KeyNumberOfTestUsers, 1),

// Record new heads for specified duration, verify that the subscription is working
actions.RecordNewHeadsSubscription(20*time.Second),
),
)
}
20 changes: 16 additions & 4 deletions integration/networktest/userwallet/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
type GatewayUser struct {
wal wallet.Wallet

gwLib *lib.TGLib // TenGateway utility
client *ethclient.Client
gwLib *lib.TGLib // TenGateway utility
client *ethclient.Client
wsClient *ethclient.Client // lazily initialized websocket client

// state managed by the wallet
nonce uint64

logger gethlog.Logger
}

func NewGatewayUser(wal wallet.Wallet, gatewayURL string, logger gethlog.Logger) (*GatewayUser, error) {
gwLib := lib.NewTenGatewayLibrary(gatewayURL, "") // not providing wsURL for now, add if we need it
func NewGatewayUser(wal wallet.Wallet, gatewayURL string, gatewayWSURL string, logger gethlog.Logger) (*GatewayUser, error) {
gwLib := lib.NewTenGatewayLibrary(gatewayURL, gatewayWSURL)

err := gwLib.Join()
if err != nil {
Expand Down Expand Up @@ -112,3 +113,14 @@ func (g *GatewayUser) NativeBalance(ctx context.Context) (*big.Int, error) {
func (g *GatewayUser) Wallet() wallet.Wallet {
return g.wal
}

func (g *GatewayUser) WSClient() (*ethclient.Client, error) {
if g.wsClient == nil {
var err error
g.wsClient, err = ethclient.Dial(g.gwLib.WS())
if err != nil {
return nil, fmt.Errorf("failed to dial TenGateway WS: %w", err)
}
}
return g.wsClient, nil
}
12 changes: 10 additions & 2 deletions integration/simulation/devnetwork/dev_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (s *InMemDevNetwork) GetGatewayURL() (string, error) {
return fmt.Sprintf("http://localhost:%d", _gwHTTPPort), nil
}

func (s *InMemDevNetwork) GetGatewayWSURL() (string, error) {
if !s.tenConfig.TenGatewayEnabled {
return "", fmt.Errorf("ten gateway not enabled")
}
return fmt.Sprintf("ws://localhost:%d", _gwWSPort), nil
}

func (s *InMemDevNetwork) GetMCOwnerWallet() (wallet.Wallet, error) {
return s.networkWallets.MCOwnerWallet, nil
}
Expand Down Expand Up @@ -147,11 +154,12 @@ func (s *InMemDevNetwork) Start() {
fmt.Println("Starting obscuro nodes")
s.startNodes()

// sleep to allow the nodes to start
time.Sleep(10 * time.Second)

if s.tenConfig.TenGatewayEnabled {
s.startTenGateway()
}
// sleep to allow the nodes to start
time.Sleep(10 * time.Second)
}

func (s *InMemDevNetwork) GetGatewayClient() (ethadapter.EthClient, error) {
Expand Down
Loading