diff --git a/integration/networktest/actions/setup_actions.go b/integration/networktest/actions/setup_actions.go index a6a6f02556..87efeaa46c 100644 --- a/integration/networktest/actions/setup_actions.go +++ b/integration/networktest/actions/setup_actions.go @@ -31,7 +31,11 @@ func (c *CreateTestUser) Run(ctx context.Context, network networktest.NetworkCon if err != nil { return ctx, fmt.Errorf("failed to get required gateway URL: %w", err) } - user, err = userwallet.NewGatewayUser(wal, gwURL, logger) + gwWSURL, err := network.GetGatewayWSURL() + if err != nil { + return ctx, fmt.Errorf("failed to get required gateway WS URL: %w", err) + } + user, err = userwallet.NewGatewayUser(wal, gwURL, gwWSURL, logger) if err != nil { return ctx, fmt.Errorf("failed to create gateway user: %w", err) } diff --git a/integration/networktest/actions/subscription_actions.go b/integration/networktest/actions/subscription_actions.go new file mode 100644 index 0000000000..99ab7a27af --- /dev/null +++ b/integration/networktest/actions/subscription_actions.go @@ -0,0 +1,74 @@ +package actions + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ten-protocol/go-ten/integration/networktest" + "github.com/ten-protocol/go-ten/integration/networktest/userwallet" +) + +type recordNewHeadsSubscriptionAction struct { + duration time.Duration + gatewayUser int // -1 if not using gateway, else test user index to get gateway from + + recordedHeads []*types.Header +} + +func (r *recordNewHeadsSubscriptionAction) Run(ctx context.Context, network networktest.NetworkConnector) (context.Context, error) { + // get gateway address for first user + user, err := FetchTestUser(ctx, r.gatewayUser) + if err != nil { + return ctx, err + } + // verify user is a gateway user + gwUser, ok := user.(*userwallet.GatewayUser) + if !ok { + return ctx, fmt.Errorf("user is not a gateway user") + } + ethClient, err := gwUser.WSClient() + if err != nil { + return ctx, err + } + headsCh := make(chan *types.Header) + sub, err := ethClient.SubscribeNewHead(ctx, headsCh) + if err != nil { + return nil, err + } + startTime := time.Now() + fmt.Println("Listening for new heads") + // read from headsCh for duration or until subscription is closed + for time.Since(startTime) < r.duration { + select { + case head := <-headsCh: + // read and store head from headsCh, then continue listening if duration has not expired + fmt.Printf("Received new head: %v\n", head.Number) + r.recordedHeads = append(r.recordedHeads, head) + case <-time.After(500 * time.Millisecond): + // no new head received, continue listening if duration has not expired + case <-sub.Err(): + // subscription closed + return ctx, fmt.Errorf("subscription closed unexpectedly") + case <-ctx.Done(): + sub.Unsubscribe() + return ctx, fmt.Errorf("context cancelled") + } + } + sub.Unsubscribe() + return ctx, nil +} + +func (r *recordNewHeadsSubscriptionAction) Verify(_ context.Context, _ networktest.NetworkConnector) error { + if len(r.recordedHeads) == 0 { + return fmt.Errorf("no new heads received during the %s period", r.duration) + } + return nil +} + +func RecordNewHeadsSubscription(duration time.Duration) networktest.Action { + // for now this test expects a gateway user and tests via the gateway + // todo: add support for testing without a gateway (need to add newHeads subscription to ObsClient) + return &recordNewHeadsSubscriptionAction{duration: duration, gatewayUser: 0} +} diff --git a/integration/networktest/env/network_setup.go b/integration/networktest/env/network_setup.go index 86d45bb8e7..06ae70ab75 100644 --- a/integration/networktest/env/network_setup.go +++ b/integration/networktest/env/network_setup.go @@ -24,7 +24,8 @@ func SepoliaTestnet(opts ...TestnetEnvOption) networktest.Environment { []string{"http://erpc.sepolia-testnet.ten.xyz:80"}, "http://sepolia-testnet-faucet.uksouth.azurecontainer.io/fund/eth", "https://rpc.sepolia.org/", - "https://testnet.ten.xyz", // :81 for websocket + "https://testnet.ten.xyz", + "wss://testnet.ten.xyz:81", ) return newTestnetEnv(connector, opts...) } @@ -36,6 +37,7 @@ func UATTestnet(opts ...TestnetEnvOption) networktest.Environment { "http://uat-testnet-faucet.uksouth.azurecontainer.io/fund/eth", "ws://uat-testnet-eth2network.uksouth.cloudapp.azure.com:9000", "https://uat-testnet.ten.xyz", + "wss://uat-testnet.ten.xyz:81", ) return newTestnetEnv(connector, opts...) } @@ -47,6 +49,7 @@ func DevTestnet(opts ...TestnetEnvOption) networktest.Environment { "http://dev-testnet-faucet.uksouth.azurecontainer.io/fund/eth", "ws://dev-testnet-eth2network.uksouth.cloudapp.azure.com:9000", "https://dev-testnet.ten.xyz", + "wss://dev-testnet.ten.xyz:81", ) return newTestnetEnv(connector, opts...) } diff --git a/integration/networktest/env/testnet.go b/integration/networktest/env/testnet.go index f71ea82703..dd6512abee 100644 --- a/integration/networktest/env/testnet.go +++ b/integration/networktest/env/testnet.go @@ -33,16 +33,18 @@ type testnetConnector struct { faucetHTTPAddress string l1RPCURL string tenGatewayURL string + tenGatewayWSURL string faucetWallet userwallet.User } -func newTestnetConnector(seqRPCAddr string, validatorRPCAddressses []string, faucetHTTPAddress string, l1WSURL string, tenGatewayURL string) *testnetConnector { +func newTestnetConnector(seqRPCAddr string, validatorRPCAddressses []string, faucetHTTPAddress string, l1WSURL string, tenGatewayURL string, tenGatewayWSURL string) *testnetConnector { return &testnetConnector{ seqRPCAddress: seqRPCAddr, validatorRPCAddresses: validatorRPCAddressses, faucetHTTPAddress: faucetHTTPAddress, l1RPCURL: l1WSURL, tenGatewayURL: tenGatewayURL, + tenGatewayWSURL: tenGatewayWSURL, } } @@ -146,3 +148,7 @@ func (t *testnetConnector) GetGatewayClient() (ethadapter.EthClient, error) { func (t *testnetConnector) GetGatewayURL() (string, error) { return t.tenGatewayURL, nil } + +func (t *testnetConnector) GetGatewayWSURL() (string, error) { + return t.tenGatewayWSURL, nil +} diff --git a/integration/networktest/interfaces.go b/integration/networktest/interfaces.go index c751328ae8..ac8d9aba25 100644 --- a/integration/networktest/interfaces.go +++ b/integration/networktest/interfaces.go @@ -26,6 +26,7 @@ type NetworkConnector interface { GetL1Client() (ethadapter.EthClient, error) GetMCOwnerWallet() (wallet.Wallet, error) // wallet that owns the management contract (network admin) GetGatewayURL() (string, error) + GetGatewayWSURL() (string, error) } // Action is any step in a test, they will typically be either minimally small steps in the test or they will be containers diff --git a/integration/networktest/tests/subscription/subscriptions_test.go b/integration/networktest/tests/subscription/subscriptions_test.go new file mode 100644 index 0000000000..1f86243b9e --- /dev/null +++ b/integration/networktest/tests/subscription/subscriptions_test.go @@ -0,0 +1,28 @@ +package subscription + +import ( + "testing" + "time" + + "github.com/ten-protocol/go-ten/integration/networktest" + "github.com/ten-protocol/go-ten/integration/networktest/actions" + "github.com/ten-protocol/go-ten/integration/networktest/env" + "github.com/ten-protocol/go-ten/integration/simulation/devnetwork" +) + +func TestGatewayNewHeadsSubscription(t *testing.T) { + networktest.TestOnlyRunsInIDE(t) + networktest.Run( + "gateway-new-heads-subscription", + t, + env.LocalDevNetwork(devnetwork.WithGateway()), + actions.Series( + // user not technically needed, but we need a gateway address to use + &actions.CreateTestUser{UserID: 0, UseGateway: true}, + actions.SetContextValue(actions.KeyNumberOfTestUsers, 1), + + // Record new heads for specified duration, verify that the subscription is working + actions.RecordNewHeadsSubscription(20*time.Second), + ), + ) +} diff --git a/integration/networktest/userwallet/gateway.go b/integration/networktest/userwallet/gateway.go index 0853595489..03fa361a9d 100644 --- a/integration/networktest/userwallet/gateway.go +++ b/integration/networktest/userwallet/gateway.go @@ -20,8 +20,9 @@ import ( type GatewayUser struct { wal wallet.Wallet - gwLib *lib.TGLib // TenGateway utility - client *ethclient.Client + gwLib *lib.TGLib // TenGateway utility + client *ethclient.Client + wsClient *ethclient.Client // lazily initialized websocket client // state managed by the wallet nonce uint64 @@ -29,8 +30,8 @@ type GatewayUser struct { logger gethlog.Logger } -func NewGatewayUser(wal wallet.Wallet, gatewayURL string, logger gethlog.Logger) (*GatewayUser, error) { - gwLib := lib.NewTenGatewayLibrary(gatewayURL, "") // not providing wsURL for now, add if we need it +func NewGatewayUser(wal wallet.Wallet, gatewayURL string, gatewayWSURL string, logger gethlog.Logger) (*GatewayUser, error) { + gwLib := lib.NewTenGatewayLibrary(gatewayURL, gatewayWSURL) err := gwLib.Join() if err != nil { @@ -112,3 +113,14 @@ func (g *GatewayUser) NativeBalance(ctx context.Context) (*big.Int, error) { func (g *GatewayUser) Wallet() wallet.Wallet { return g.wal } + +func (g *GatewayUser) WSClient() (*ethclient.Client, error) { + if g.wsClient == nil { + var err error + g.wsClient, err = ethclient.Dial(g.gwLib.WS()) + if err != nil { + return nil, fmt.Errorf("failed to dial TenGateway WS: %w", err) + } + } + return g.wsClient, nil +} diff --git a/integration/simulation/devnetwork/dev_network.go b/integration/simulation/devnetwork/dev_network.go index 25cd70f86b..2315393403 100644 --- a/integration/simulation/devnetwork/dev_network.go +++ b/integration/simulation/devnetwork/dev_network.go @@ -71,6 +71,13 @@ func (s *InMemDevNetwork) GetGatewayURL() (string, error) { return fmt.Sprintf("http://localhost:%d", _gwHTTPPort), nil } +func (s *InMemDevNetwork) GetGatewayWSURL() (string, error) { + if !s.tenConfig.TenGatewayEnabled { + return "", fmt.Errorf("ten gateway not enabled") + } + return fmt.Sprintf("ws://localhost:%d", _gwWSPort), nil +} + func (s *InMemDevNetwork) GetMCOwnerWallet() (wallet.Wallet, error) { return s.networkWallets.MCOwnerWallet, nil } @@ -147,11 +154,12 @@ func (s *InMemDevNetwork) Start() { fmt.Println("Starting obscuro nodes") s.startNodes() + // sleep to allow the nodes to start + time.Sleep(10 * time.Second) + if s.tenConfig.TenGatewayEnabled { s.startTenGateway() } - // sleep to allow the nodes to start - time.Sleep(10 * time.Second) } func (s *InMemDevNetwork) GetGatewayClient() (ethadapter.EthClient, error) {