Skip to content

Commit

Permalink
Remove ChainClientRPC interface
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Jun 28, 2024
1 parent 8886d0c commit a83d342
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 1,272 deletions.
2 changes: 1 addition & 1 deletion common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) ChainID() CHAIN_ID {
return c.chainID
}

func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool)) error {
callsCompleted := 0
for _, n := range c.primaryNodes {
if ctx.Err() != nil {
Expand Down
6 changes: 3 additions & 3 deletions common/client/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
newDialedNode := func(t *testing.T, opts testNodeOpts) testNode {
node := newTestNode(t, opts)
opts.rpc.On("Close").Return(nil).Once()
opts.rpc.On("UnsubscribeAllExcept", mock.Anything, mock.Anything).Maybe()

node.setState(NodeStateDialed)
return node
Expand Down Expand Up @@ -164,7 +165,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
pollError := errors.New("failed to get ClientVersion")
rpc.On("Ping", mock.Anything).Return(pollError)
// disconnects all on transfer to unreachable
rpc.On("UnsubscribeAllExcept", mock.Anything, mock.Anything).Once()
// might be called in unreachable loop
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
node.declareAlive()
Expand Down Expand Up @@ -344,7 +344,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
// disconnects all on transfer to unreachable or outOfSync
rpc.On("UnsubscribeAllExcept", mock.Anything, mock.Anything).Once()
// might be called in unreachable loop
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
node.declareAlive()
Expand Down Expand Up @@ -1140,7 +1139,7 @@ func TestUnit_NodeLifecycle_start(t *testing.T) {

newNode := func(t *testing.T, opts testNodeOpts) testNode {
node := newTestNode(t, opts)
opts.rpc.On("UnsubscribeAllExcept", nil, nil)
opts.rpc.On("UnsubscribeAllExcept", nil, nil).Maybe()
opts.rpc.On("Close").Return(nil).Once()

return node
Expand Down Expand Up @@ -1452,6 +1451,7 @@ func TestUnit_NodeLifecycle_SyncingLoop(t *testing.T) {
opts.config.nodeIsSyncingEnabled = true
node := newTestNode(t, opts)
opts.rpc.On("Close").Return(nil).Once()
opts.rpc.On("UnsubscribeAllExcept", mock.Anything, mock.Anything).Maybe()

node.setState(NodeStateDialed)
return node
Expand Down
13 changes: 6 additions & 7 deletions core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func ContextWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc
type chainClient struct {
multiNode *commonclient.MultiNode[
*big.Int,
ChainClientRPC,
*RpcClient,
]
logger logger.SugaredLogger
chainType chaintype.ChainType
Expand All @@ -112,8 +112,8 @@ func NewChainClient(
lggr logger.Logger,
selectionMode string,
leaseDuration time.Duration,
nodes []commonclient.Node[*big.Int, ChainClientRPC],
sendonlys []commonclient.SendOnlyNode[*big.Int, ChainClientRPC],
nodes []commonclient.Node[*big.Int, *RpcClient],
sendonlys []commonclient.SendOnlyNode[*big.Int, *RpcClient],
chainID *big.Int,
clientErrors evmconfig.ClientErrors,
) Client {
Expand Down Expand Up @@ -163,13 +163,13 @@ func (c *chainClient) BatchCallContextAll(ctx context.Context, b []ethrpc.BatchE
return selectionErr
}

doFunc := func(ctx context.Context, rpc ChainClientRPC, isSendOnly bool) bool {
doFunc := func(ctx context.Context, rpc *RpcClient, isSendOnly bool) {
if rpc == main {
return true
return
}
// Parallel call made to all other nodes with ignored return value
wg.Add(1)
go func(rpc ChainClientRPC) {
go func(rpc *RpcClient) {
defer wg.Done()
err := rpc.BatchCallContext(ctx, b)
if err != nil {
Expand All @@ -178,7 +178,6 @@ func (c *chainClient) BatchCallContextAll(ctx context.Context, b []ethrpc.BatchE
c.logger.Debug("Secondary node BatchCallContext success")
}
}(rpc)
return true
}

if err := c.multiNode.DoAll(ctx, doFunc); err != nil {
Expand Down
139 changes: 89 additions & 50 deletions core/chains/evm/client/chain_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client_test
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"net/url"
Expand All @@ -16,10 +15,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
pkgerrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -745,60 +742,103 @@ func TestEthClient_SubscribeNewHead(t *testing.T) {
sub.Unsubscribe()
}

func newMockRpc(t *testing.T) *client.MockChainClientRPC {
mockRpc := client.NewMockChainClientRPC(t)
mockRpc.On("Dial", mock.Anything).Return(nil).Once()
mockRpc.On("Close").Return(nil).Once()
mockRpc.On("ChainID", mock.Anything).Return(testutils.FixtureChainID, nil).Once()
// node does not always manage to fully setup aliveLoop, so we have to make calls optional to avoid flakes
mockRpc.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return(client.NewMockSubscription(), nil).Maybe()
mockRpc.On("SetAliveLoopSub", mock.Anything).Return().Maybe()
sub := client.NewMockSubscription()
mockRpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan *evmtypes.Head), sub, nil).Maybe()
mockRpc.On("Unsubscribe", mock.Anything).Return(nil).Maybe()
return mockRpc
}

func TestChainClient_BatchCallContext(t *testing.T) {
/*
func TestEthClient_BatchCallContext(t *testing.T) {
t.Parallel()
t.Run("batch requests return errors", func(t *testing.T) {
ctx := tests.Context(t)
rpcError := errors.New("something went wrong")
blockNumResp := ""
blockNum := hexutil.EncodeBig(big.NewInt(42))
b := []rpc.BatchElem{
{
Method: "eth_getBlockByNumber",
Args: []interface{}{blockNum, true},
Result: &types.Block{},
},
{
Method: "eth_blockNumber",
Result: &blockNumResp,
},
// Set up the WebSocket server
wsServer := testutils.NewWSServer(t, testutils.FixtureChainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) {
switch method {
case "eth_subscribe":
resp.Result = `"0x00"`
return
case "eth_unsubscribe":
resp.Result = "true"
return
}
return
})
defer wsServer.Close()
wsURL := wsServer.WSURL().String()
// Set up the HTTP mock server
handler := func(w http.ResponseWriter, r *http.Request) {
var requests []rpc.BatchElem
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&requests)
require.NoError(t, err)
mockRpc := newMockRpc(t)
mockRpc.On("BatchCallContext", mock.Anything, b).Run(func(args mock.Arguments) {
reqs := args.Get(1).([]rpc.BatchElem)
for i := 0; i < len(reqs); i++ {
elem := &reqs[i]
elem.Error = rpcError
responses := make([]map[string]interface{}, len(requests))
for i, req := range requests {
resp := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
}
}).Return(nil).Maybe()
client := client.NewChainClientWithMockedRpc(t, commonclient.NodeSelectionModeRoundRobin, time.Second*0, time.Second*0, testutils.FixtureChainID, mockRpc)
err := client.Dial(ctx)
require.NoError(t, err)
switch req.Method {
case "eth_getBlockByNumber":
block := map[string]interface{}{
"number": "0x2a", // 42 in hex
"hash": "0x123",
"transactions": []interface{}{},
"uncles": []interface{}{},
}
resp["result"] = block
case "eth_blockNumber":
resp["result"] = "0x2a" // 42 in hex
default:
resp["error"] = map[string]interface{}{
"code": -32601,
"message": "Method not found",
}
}
responses[i] = resp
}
err = client.BatchCallContext(ctx, b)
encoder := json.NewEncoder(w)
err = encoder.Encode(responses)
require.NoError(t, err)
for _, elem := range b {
require.ErrorIs(t, rpcError, elem.Error)
}
})
}
httpServer := httptest.NewServer(http.HandlerFunc(handler))
defer httpServer.Close()
parsedHttpURL, err := url.Parse(httpServer.URL)
require.NoError(t, err)
// Create a client and connect to the mock servers
cfg := client.TestNodePoolConfig{
NodeSelectionMode: commonclient.NodeSelectionModeRoundRobin,
}
c, err := client.NewChainClientWithTestNode(t, cfg, time.Second*0, cfg.NodeLeaseDuration, wsURL, parsedHttpURL, nil, 42, testutils.FixtureChainID)
require.NoError(t, err)
require.NoError(t, c.Dial(context.Background()))
// Prepare batch requests
blockNum := hexutil.EncodeBig(big.NewInt(42))
batch := []rpc.BatchElem{
{
Method: "eth_getBlockByNumber",
Args: []interface{}{blockNum, false},
Result: new(types.Block),
},
{
Method: "eth_blockNumber",
Result: new(hexutil.Big),
},
}
// Execute batch call
err = c.BatchCallContext(context.Background(), batch)
require.NoError(t, err)
// Verify responses
block := batch[0].Result.(*types.Block)
assert.Equal(t, big.NewInt(42), block.Number())
assert.Equal(t, common.HexToHash("0x123"), block.Hash())
assert.Equal(t, big.NewInt(42), (*big.Int)(batch[1].Result.(*hexutil.Big)))
}
*/

func TestEthClient_ErroringClient(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -829,8 +869,7 @@ func TestEthClient_ErroringClient(t *testing.T) {
require.Equal(t, err, commonclient.ErroringNodeError)

id := erroringClient.ConfiguredChainID()
var expected *big.Int
require.Equal(t, id, expected)
require.Equal(t, id, big.NewInt(0))

_, err = erroringClient.CodeAt(ctx, common.Address{}, nil)
require.Equal(t, err, commonclient.ErroringNodeError)
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client {
var empty url.URL
var primaries []commonclient.Node[*big.Int, ChainClientRPC]
var sendonlys []commonclient.SendOnlyNode[*big.Int, ChainClientRPC]
var primaries []commonclient.Node[*big.Int, *RpcClient]
var sendonlys []commonclient.SendOnlyNode[*big.Int, *RpcClient]
for i, node := range nodes {
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(cfg, lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
Expand Down
14 changes: 7 additions & 7 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,18 @@ func NewChainClientWithTestNode(
}
rpc := NewRPCClient(nodePoolCfg, lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary)

n := commonclient.NewNode[*big.Int, *evmtypes.Head, ChainClientRPC](
n := commonclient.NewNode[*big.Int, *evmtypes.Head, *RpcClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, ChainClientRPC]{n}
primaries := []commonclient.Node[*big.Int, *RpcClient]{n}

var sendonlys []commonclient.SendOnlyNode[*big.Int, ChainClientRPC]
var sendonlys []commonclient.SendOnlyNode[*big.Int, *RpcClient]
for i, u := range sendonlyRPCURLs {
if u.Scheme != "http" && u.Scheme != "https" {
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(nodePoolCfg, lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary)
s := commonclient.NewSendOnlyNode[*big.Int, ChainClientRPC](
s := commonclient.NewSendOnlyNode[*big.Int, *RpcClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func NewChainClientWithMockedRpc(
leaseDuration time.Duration,
noNewHeadsThreshold time.Duration,
chainID *big.Int,
rpc ChainClientRPC,
rpc *RpcClient,
) Client {
lggr := logger.Test(t)

Expand All @@ -185,9 +185,9 @@ func NewChainClientWithMockedRpc(
}
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, ChainClientRPC](
n := commonclient.NewNode[*big.Int, *evmtypes.Head, *RpcClient](
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, ChainClientRPC]{n}
primaries := []commonclient.Node[*big.Int, *RpcClient]{n}
clientErrors := NewTestClientErrors()
c := NewChainClient(lggr, selectionMode, leaseDuration, primaries, nil, chainID, &clientErrors)
t.Cleanup(c.Close)
Expand Down
Loading

0 comments on commit a83d342

Please sign in to comment.