Skip to content

Commit

Permalink
Fix losing batch call errors for individual requests (#12127)
Browse files Browse the repository at this point in the history
* Fixed losing batch call errors for individual requests

* Fixed linting

* Added comments

* Fixed linting

* Improved error check in test
  • Loading branch information
amit-momin authored Feb 23, 2024
1 parent ec28bb9 commit d10b471
Show file tree
Hide file tree
Showing 11 changed files with 1,258 additions and 135 deletions.
70 changes: 35 additions & 35 deletions common/client/mock_rpc_test.go

Large diffs are not rendered by default.

94 changes: 49 additions & 45 deletions common/client/multi_node.go

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
)

type multiNodeRPCClient RPC[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable]]
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], any]

type testMultiNode struct {
*multiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient]
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient, any]
}

type multiNodeOpts struct {
Expand All @@ -49,19 +49,19 @@ func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode {
}

result := NewMultiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient](opts.logger,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient, any](opts.logger,
opts.selectionMode, opts.leaseDuration, opts.noNewHeadsThreshold, opts.nodes, opts.sendonlys,
opts.chainID, opts.chainType, opts.chainFamily, opts.classifySendTxError, opts.sendTxSoftTimeout)
return testMultiNode{
result.(*multiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient]),
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient, any]),
}
}

func newMultiNodeRPCClient(t *testing.T) *mockRPC[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable]] {
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], any] {
return newMockRPC[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable]](t)
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], any](t)
}

func newHealthyNode(t *testing.T, chainID types.ID) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] {
Expand Down
6 changes: 4 additions & 2 deletions common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type RPC[
TX_RECEIPT types.Receipt[TX_HASH, BLOCK_HASH],
FEE feetypes.Fee,
HEAD types.Head[BLOCK_HASH],

BATCH_ELEM any,
] interface {
NodeClient[
CHAIN_ID,
Expand All @@ -42,6 +42,7 @@ type RPC[
TX_RECEIPT,
FEE,
HEAD,
BATCH_ELEM,
]
}

Expand Down Expand Up @@ -84,6 +85,7 @@ type clientAPI[
TX_RECEIPT types.Receipt[TX_HASH, BLOCK_HASH],
FEE feetypes.Fee,
HEAD types.Head[BLOCK_HASH],
BATCH_ELEM any,
] interface {
connection[CHAIN_ID, HEAD]

Expand Down Expand Up @@ -118,7 +120,7 @@ type clientAPI[
FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error)

// Misc
BatchCallContext(ctx context.Context, b []any) error
BatchCallContext(ctx context.Context, b []BATCH_ELEM) error
CallContract(
ctx context.Context,
msg interface{},
Expand Down
17 changes: 7 additions & 10 deletions core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type chainClient struct {
*assets.Wei,
*evmtypes.Head,
RPCCLient,
rpc.BatchElem,
]
logger logger.SugaredLogger
}
Expand Down Expand Up @@ -88,20 +89,16 @@ func (c *chainClient) BalanceAt(ctx context.Context, account common.Address, blo
return c.multiNode.BalanceAt(ctx, account, blockNumber)
}

// Request specific errors for batch calls are returned to the individual BatchElem.
// Ensure the same BatchElem slice provided by the caller is passed through the call stack
// to ensure the caller has access to the errors.
func (c *chainClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
batch := make([]any, len(b))
for i, arg := range b {
batch[i] = any(arg)
}
return c.multiNode.BatchCallContext(ctx, batch)
return c.multiNode.BatchCallContext(ctx, b)
}

// Similar to BatchCallContext, ensure the provided BatchElem slice is passed through
func (c *chainClient) BatchCallContextAll(ctx context.Context, b []rpc.BatchElem) error {
batch := make([]any, len(b))
for i, arg := range b {
batch[i] = any(arg)
}
return c.multiNode.BatchCallContextAll(ctx, batch)
return c.multiNode.BatchCallContextAll(ctx, b)
}

// TODO-1663: return custom Block type instead of geth's once client.go is deprecated.
Expand Down
70 changes: 70 additions & 0 deletions core/chains/evm/client/chain_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package client_test

import (
"errors"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commonclient "github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
)

func newMockRpc(t *testing.T) *mocks.RPCCLient {
mockRpc := mocks.NewRPCCLient(t)
mockRpc.On("Dial", mock.Anything).Return(nil).Once()
mockRpc.On("Close").Return(nil).Once()
mockRpc.On("ChainID", mock.Anything).Return(testutils.FixtureChainID, nil).Once()
mockRpc.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return(client.NewMockSubscription(), nil).Once()
mockRpc.On("SetAliveLoopSub", mock.Anything).Return().Once()
return mockRpc
}

func TestChainClient_BatchCallContext(t *testing.T) {
t.Parallel()

t.Run("batch requests return errors", func(t *testing.T) {
ctx := testutils.Context(t)
rpcError := errors.New("something went wrong")
blockNumResp := ""
blockNum := hexutil.EncodeBig(big.NewInt(42))
b := []rpc.BatchElem{
{
Method: "eth_getBlockByNumber",
Args: []interface{}{blockNum, true},
Result: &types.Block{},
},
{
Method: "eth_blockNumber",
Result: &blockNumResp,
},
}

mockRpc := newMockRpc(t)
mockRpc.On("BatchCallContext", mock.Anything, b).Run(func(args mock.Arguments) {
reqs := args.Get(1).([]rpc.BatchElem)
for i := 0; i < len(reqs); i++ {
elem := &reqs[i]
elem.Error = rpcError
}
}).Return(nil).Once()

client := client.NewChainClientWithMockedRpc(t, commonclient.NodeSelectionModeRoundRobin, time.Second*0, time.Second*0, testutils.FixtureChainID, mockRpc)
err := client.Dial(ctx)
require.NoError(t, err)

err = client.BatchCallContext(ctx, b)
require.NoError(t, err)
for _, elem := range b {
require.ErrorIs(t, rpcError, elem.Error)
}
})
}
26 changes: 5 additions & 21 deletions core/chains/evm/client/chain_id_sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,6 @@ import (
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
)

type mockSubscription struct {
unsubscribed bool
Errors chan error
}

func newMockSubscription() *mockSubscription {
return &mockSubscription{Errors: make(chan error)}
}

func (mes *mockSubscription) Err() <-chan error { return mes.Errors }

func (mes *mockSubscription) Unsubscribe() {
mes.unsubscribed = true
close(mes.Errors)
}

func TestChainIDSubForwarder(t *testing.T) {
t.Parallel()

Expand All @@ -37,7 +21,7 @@ func TestChainIDSubForwarder(t *testing.T) {

ch := make(chan *evmtypes.Head)
forwarder := newChainIDSubForwarder(chainID, ch)
sub := newMockSubscription()
sub := NewMockSubscription()
err := forwarder.start(sub, nil)
assert.NoError(t, err)
forwarder.Unsubscribe()
Expand All @@ -54,7 +38,7 @@ func TestChainIDSubForwarder(t *testing.T) {

ch := make(chan *evmtypes.Head)
forwarder := newChainIDSubForwarder(chainID, ch)
sub := newMockSubscription()
sub := NewMockSubscription()
err := forwarder.start(sub, nil)
assert.NoError(t, err)
sub.Errors <- errors.New("boo")
Expand All @@ -72,7 +56,7 @@ func TestChainIDSubForwarder(t *testing.T) {

ch := make(chan *evmtypes.Head)
forwarder := newChainIDSubForwarder(chainID, ch)
sub := newMockSubscription()
sub := NewMockSubscription()
err := forwarder.start(sub, nil)
assert.NoError(t, err)
forwarder.srcCh <- &evmtypes.Head{}
Expand All @@ -90,7 +74,7 @@ func TestChainIDSubForwarder(t *testing.T) {

ch := make(chan *evmtypes.Head)
forwarder := newChainIDSubForwarder(chainID, ch)
sub := newMockSubscription()
sub := NewMockSubscription()
errIn := errors.New("foo")
errOut := forwarder.start(sub, errIn)
assert.Equal(t, errIn, errOut)
Expand All @@ -101,7 +85,7 @@ func TestChainIDSubForwarder(t *testing.T) {

ch := make(chan *evmtypes.Head)
forwarder := newChainIDSubForwarder(chainID, ch)
sub := newMockSubscription()
sub := NewMockSubscription()
err := forwarder.start(sub, nil)
assert.NoError(t, err)

Expand Down
42 changes: 42 additions & 0 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,32 @@ func NewChainClientWithEmptyNode(
return c
}

func NewChainClientWithMockedRpc(
t *testing.T,
selectionMode string,
leaseDuration time.Duration,
noNewHeadsThreshold time.Duration,
chainID *big.Int,
rpc RPCCLient,
) Client {

lggr := logger.Test(t)

var chainType commonconfig.ChainType

cfg := TestNodePoolConfig{
NodeSelectionMode: NodeSelectionMode_RoundRobin,
}
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCCLient](
cfg, noNewHeadsThreshold, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCCLient]{n}
c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType)
t.Cleanup(c.Close)
return c
}

type TestableSendOnlyNode interface {
SendOnlyNode
SetEthClient(newBatchSender BatchSender, newSender TxSender)
Expand All @@ -137,3 +163,19 @@ const HeadResult = `{"difficulty":"0xf3a00","extraData":"0xd88301050384676574688
func IsDialed(s SendOnlyNode) bool {
return s.(*sendOnlyNode).dialed
}

type mockSubscription struct {
unsubscribed bool
Errors chan error
}

func NewMockSubscription() *mockSubscription {
return &mockSubscription{Errors: make(chan error)}
}

func (mes *mockSubscription) Err() <-chan error { return mes.Errors }

func (mes *mockSubscription) Unsubscribe() {
mes.unsubscribed = true
close(mes.Errors)
}
Loading

0 comments on commit d10b471

Please sign in to comment.