From 540d54b2dc1708c7e75131b5ae920c22d72c8177 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:05:57 -0700 Subject: [PATCH 1/3] Revert "Create HeadPoller for Multi-Node (#12871)" This reverts commit 7338448469d014b4b1eb5aaedf0303f687e45f62. --- common/client/poller.go | 98 ----------------- common/client/poller_test.go | 207 ----------------------------------- 2 files changed, 305 deletions(-) delete mode 100644 common/client/poller.go delete mode 100644 common/client/poller_test.go diff --git a/common/client/poller.go b/common/client/poller.go deleted file mode 100644 index b21f28fe604..00000000000 --- a/common/client/poller.go +++ /dev/null @@ -1,98 +0,0 @@ -package client - -import ( - "context" - "sync" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/services" - - "github.com/smartcontractkit/chainlink/v2/common/types" -) - -// Poller is a component that polls a function at a given interval -// and delivers the result to a channel. It is used by multinode to poll -// for new heads and implements the Subscription interface. -type Poller[T any] struct { - services.StateMachine - pollingInterval time.Duration - pollingFunc func(ctx context.Context) (T, error) - pollingTimeout time.Duration - logger logger.Logger - channel chan<- T - errCh chan error - - stopCh services.StopChan - wg sync.WaitGroup -} - -// NewPoller creates a new Poller instance -func NewPoller[ - T any, -](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, channel chan<- T, logger logger.Logger) Poller[T] { - return Poller[T]{ - pollingInterval: pollingInterval, - pollingFunc: pollingFunc, - pollingTimeout: pollingTimeout, - channel: channel, - logger: logger, - errCh: make(chan error), - stopCh: make(chan struct{}), - } -} - -var _ types.Subscription = &Poller[any]{} - -func (p *Poller[T]) Start() error { - return p.StartOnce("Poller", func() error { - p.wg.Add(1) - go p.pollingLoop() - return nil - }) -} - -// Unsubscribe cancels the sending of events to the data channel -func (p *Poller[T]) Unsubscribe() { - _ = p.StopOnce("Poller", func() error { - close(p.stopCh) - p.wg.Wait() - close(p.errCh) - return nil - }) -} - -func (p *Poller[T]) Err() <-chan error { - return p.errCh -} - -func (p *Poller[T]) pollingLoop() { - defer p.wg.Done() - - ticker := time.NewTicker(p.pollingInterval) - defer ticker.Stop() - - for { - select { - case <-p.stopCh: - return - case <-ticker.C: - // Set polling timeout - pollingCtx, cancelPolling := context.WithTimeout(context.Background(), p.pollingTimeout) - p.stopCh.CtxCancel(pollingCtx, cancelPolling) - // Execute polling function - result, err := p.pollingFunc(pollingCtx) - cancelPolling() - if err != nil { - p.logger.Warnf("polling error: %v", err) - continue - } - // Send result to channel or block if channel is full - select { - case p.channel <- result: - case <-p.stopCh: - return - } - } - } -} diff --git a/common/client/poller_test.go b/common/client/poller_test.go deleted file mode 100644 index 3f11c759adb..00000000000 --- a/common/client/poller_test.go +++ /dev/null @@ -1,207 +0,0 @@ -package client - -import ( - "context" - "fmt" - "math/big" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" -) - -func Test_Poller(t *testing.T) { - lggr := logger.Test(t) - - t.Run("Test multiple start", func(t *testing.T) { - pollFunc := func(ctx context.Context) (Head, error) { - return nil, nil - } - - channel := make(chan Head, 1) - defer close(channel) - - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - err := poller.Start() - require.NoError(t, err) - - err = poller.Start() - require.Error(t, err) - poller.Unsubscribe() - }) - - t.Run("Test polling for heads", func(t *testing.T) { - // Mock polling function that returns a new value every time it's called - var pollNumber int - pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context) (Head, error) { - pollLock.Lock() - defer pollLock.Unlock() - pollNumber++ - h := head{ - BlockNumber: int64(pollNumber), - BlockDifficulty: big.NewInt(int64(pollNumber)), - } - return h.ToMockHead(t), nil - } - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - // Create poller and start to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Receive updates from the poller - pollCount := 0 - pollMax := 50 - for ; pollCount < pollMax; pollCount++ { - h := <-channel - assert.Equal(t, int64(pollCount+1), h.BlockNumber()) - } - }) - - t.Run("Test polling errors", func(t *testing.T) { - // Mock polling function that returns an error - var pollNumber int - pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context) (Head, error) { - pollLock.Lock() - defer pollLock.Unlock() - pollNumber++ - return nil, fmt.Errorf("polling error %d", pollNumber) - } - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - - // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, olggr) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Ensure that all errors were logged as expected - logsSeen := func() bool { - for pollCount := 0; pollCount < 50; pollCount++ { - numLogs := observedLogs.FilterMessage(fmt.Sprintf("polling error: polling error %d", pollCount+1)).Len() - if numLogs != 1 { - return false - } - } - return true - } - require.Eventually(t, logsSeen, time.Second, time.Millisecond) - }) - - t.Run("Test polling timeout", func(t *testing.T) { - pollFunc := func(ctx context.Context) (Head, error) { - if <-ctx.Done(); true { - return nil, ctx.Err() - } - return nil, nil - } - - // Set instant timeout - pollingTimeout := time.Duration(0) - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - - // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Ensure that timeout errors were logged as expected - logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: context deadline exceeded").Len() >= 1 - } - require.Eventually(t, logsSeen, time.Second, time.Millisecond) - }) - - t.Run("Test unsubscribe during polling", func(t *testing.T) { - wait := make(chan struct{}) - pollFunc := func(ctx context.Context) (Head, error) { - close(wait) - // Block in polling function until context is cancelled - if <-ctx.Done(); true { - return nil, ctx.Err() - } - return nil, nil - } - - // Set long timeout - pollingTimeout := time.Minute - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - - // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr) - require.NoError(t, poller.Start()) - - // Unsubscribe while blocked in polling function - <-wait - poller.Unsubscribe() - - // Ensure error was logged - logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: context canceled").Len() >= 1 - } - require.Eventually(t, logsSeen, time.Second, time.Millisecond) - }) -} - -func Test_Poller_Unsubscribe(t *testing.T) { - lggr := logger.Test(t) - pollFunc := func(ctx context.Context) (Head, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - h := head{ - BlockNumber: 0, - BlockDifficulty: big.NewInt(0), - } - return h.ToMockHead(t), nil - } - } - - t.Run("Test multiple unsubscribe", func(t *testing.T) { - channel := make(chan Head, 1) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - err := poller.Start() - require.NoError(t, err) - - <-channel - poller.Unsubscribe() - poller.Unsubscribe() - }) - - t.Run("Test unsubscribe with closed channel", func(t *testing.T) { - channel := make(chan Head, 1) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - err := poller.Start() - require.NoError(t, err) - - <-channel - close(channel) - poller.Unsubscribe() - }) -} From 386e26cd0b9f4992d3cb511cb9fb39a129ccd8ac Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:07:50 -0700 Subject: [PATCH 2/3] Revert "Update log trigger log provider readMaxBatchSize to 56 (#12951)" This reverts commit c98ea6413dcdc02a7d0c82b9b36d3fce97dac94b. --- .changeset/tidy-trees-tie.md | 5 ----- .../ocr2keeper/evmregistry/v21/logprovider/provider.go | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) delete mode 100644 .changeset/tidy-trees-tie.md diff --git a/.changeset/tidy-trees-tie.md b/.changeset/tidy-trees-tie.md deleted file mode 100644 index 7ff415e9de4..00000000000 --- a/.changeset/tidy-trees-tie.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chainlink": patch ---- - -#changed Updating the log trigger log provider's readMaxBatchSize to 56 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index e2c1a1531e2..b07b08d3354 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -42,7 +42,7 @@ var ( readJobQueueSize = 64 readLogsTimeout = 10 * time.Second - readMaxBatchSize = 56 + readMaxBatchSize = 32 // reorgBuffer is the number of blocks to add as a buffer to the block range when reading logs. reorgBuffer = int64(32) readerThreads = 4 From ba89864f3e2db46fb4de88c5c690d9e461c94e7c Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:08:31 -0700 Subject: [PATCH 3/3] Revert "Wrap RPC errors (#12638)" This reverts commit bcf76534862b32503f4192e38b7e1cb4dd7e312d. --- .changeset/quick-fishes-heal.md | 5 - common/client/models.go | 29 ---- common/client/models_test.go | 16 -- common/client/multi_node.go | 13 +- common/client/multi_node_test.go | 20 +-- core/chains/evm/client/rpc_client.go | 157 ++++++++++++------ core/chains/evm/client/sub_error_wrapper.go | 77 --------- .../evm/client/sub_error_wrapper_test.go | 75 --------- .../chains/evm/gas/block_history_estimator.go | 2 +- 9 files changed, 119 insertions(+), 275 deletions(-) delete mode 100644 .changeset/quick-fishes-heal.md delete mode 100644 common/client/models_test.go delete mode 100644 core/chains/evm/client/sub_error_wrapper.go delete mode 100644 core/chains/evm/client/sub_error_wrapper_test.go diff --git a/.changeset/quick-fishes-heal.md b/.changeset/quick-fishes-heal.md deleted file mode 100644 index 966e74c843a..00000000000 --- a/.changeset/quick-fishes-heal.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chainlink": patch ---- -#changed -Added prefix `RPCClient returned error ({RPC_NAME})` to RPC errors to simplify filtering of RPC related issues. diff --git a/common/client/models.go b/common/client/models.go index fd0c3915940..66f1e9cf88b 100644 --- a/common/client/models.go +++ b/common/client/models.go @@ -28,35 +28,6 @@ var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, Exc // sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown} -func (c SendTxReturnCode) String() string { - switch c { - case Successful: - return "Successful" - case Fatal: - return "Fatal" - case Retryable: - return "Retryable" - case Underpriced: - return "Underpriced" - case Unknown: - return "Unknown" - case Unsupported: - return "Unsupported" - case TransactionAlreadyKnown: - return "TransactionAlreadyKnown" - case InsufficientFunds: - return "InsufficientFunds" - case ExceedsMaxFee: - return "ExceedsMaxFee" - case FeeOutOfValidRange: - return "FeeOutOfValidRange" - case OutOfCounters: - return "OutOfCounters" - default: - return fmt.Sprintf("SendTxReturnCode(%d)", c) - } -} - type NodeTier int const ( diff --git a/common/client/models_test.go b/common/client/models_test.go deleted file mode 100644 index 2d5dc31b373..00000000000 --- a/common/client/models_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package client - -import ( - "strings" - "testing" -) - -func TestSendTxReturnCode_String(t *testing.T) { - // ensure all the SendTxReturnCodes have proper name - for c := 1; c < int(sendTxReturnCodeLen); c++ { - strC := SendTxReturnCode(c).String() - if strings.Contains(strC, "SendTxReturnCode(") { - t.Errorf("Expected %s to have a proper string representation", strC) - } - } -} diff --git a/common/client/multi_node.go b/common/client/multi_node.go index fa413df91aa..cc8daed599c 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -561,13 +561,6 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return n.RPC().PendingSequenceAt(ctx, addr) } -type sendTxErrors map[SendTxReturnCode][]error - -// String - returns string representation of the errors map. Required by logger to properly represent the value -func (errs sendTxErrors) String() string { - return fmt.Sprint(map[SendTxReturnCode][]error(errs)) -} - func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) SendEmptyTransaction( ctx context.Context, newTxAttempt func(seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt any, err error), @@ -609,7 +602,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP ctx, cancel := c.chStop.Ctx(ctx) defer cancel() requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) - errorsByCode := sendTxErrors{} + errorsByCode := map[SendTxReturnCode][]error{} var softTimeoutChan <-chan time.Time var resultsCount int loop: @@ -646,7 +639,7 @@ loop: func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { defer c.wg.Done() - resultsByCode := sendTxErrors{} + resultsByCode := map[SendTxReturnCode][]error{} // txResults eventually will be closed for txResult := range txResults { resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) @@ -660,7 +653,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } } -func aggregateTxResults(resultsByCode sendTxErrors) (txResult error, err error) { +func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) { severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) if hasSuccess { diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 9f6904fcaf2..9c09bd57d70 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -796,13 +796,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name string ExpectedTxResult string ExpectedCriticalErr string - ResultsByCode sendTxErrors + ResultsByCode map[SendTxReturnCode][]error }{ { Name: "Returns success and logs critical error on success and Fatal", ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Successful: {errors.New("success")}, Fatal: {errors.New("fatal")}, }, @@ -811,7 +811,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ TransactionAlreadyKnown: {errors.New("tx_already_known")}, Unsupported: {errors.New("unsupported")}, }, @@ -820,7 +820,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Prefers sever error to temporary", ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Retryable: {errors.New("retryable")}, Underpriced: {errors.New("underpriced")}, }, @@ -829,7 +829,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Returns temporary error", ExpectedTxResult: "retryable", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Retryable: {errors.New("retryable")}, }, }, @@ -837,7 +837,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Insufficient funds is treated as error", ExpectedTxResult: "", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Successful: {nil}, InsufficientFunds: {errors.New("insufficientFunds")}, }, @@ -846,13 +846,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Logs critical error on empty ResultsByCode", ExpectedTxResult: "expected at least one response on SendTransaction", ExpectedCriticalErr: "expected at least one response on SendTransaction", - ResultsByCode: sendTxErrors{}, + ResultsByCode: map[SendTxReturnCode][]error{}, }, { Name: "Zk out of counter error", ExpectedTxResult: "not enough keccak counters to continue the execution", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ OutOfCounters: {errors.New("not enough keccak counters to continue the execution")}, }, }, @@ -870,9 +870,6 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { assert.EqualError(t, txResult, testCase.ExpectedTxResult) } - logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) - logger.Sugared(logger.Test(t)).Criticalw("observed invariant violation on SendTransaction", "resultsByCode", testCase.ResultsByCode, "err", err) - if testCase.ExpectedCriticalErr == "" { assert.NoError(t, err) } else { @@ -887,4 +884,5 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { delete(codesToCover, codeToIgnore) } assert.Empty(t, codesToCover, "all of the SendTxReturnCode must be covered by this test") + } diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 548acf3206c..255b038037a 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -130,7 +130,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") if err != nil { promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) + return pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()) } r.ws.rpc = wsrpc @@ -159,7 +159,7 @@ func (r *rpcClient) DialHTTP() error { httprpc, err := rpc.DialHTTP(r.http.uri.String()) if err != nil { promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing HTTP: %v", r.http.uri.Redacted())) + return pkgerrors.Wrapf(err, "error while dialing HTTP: %v", r.http.uri.Redacted()) } r.http.rpc = httprpc @@ -295,7 +295,10 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() { // CallContext implementation func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return err + } defer cancel() lggr := r.newRqLggr().With( "method", method, @@ -304,7 +307,6 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method lggr.Debug("RPC call: evmclient.Client#CallContext") start := time.Now() - var err error if http != nil { err = r.wrapHTTP(http.rpc.CallContext(ctx, result, method, args...)) } else { @@ -318,13 +320,15 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method } func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return err + } defer cancel() lggr := r.newRqLggr().With("nBatchElems", len(b), "batchElems", b) lggr.Trace("RPC call: evmclient.Client#BatchCallContext") start := time.Now() - var err error if http != nil { err = r.wrapHTTP(http.rpc.BatchCallContext(ctx, b)) } else { @@ -338,23 +342,24 @@ func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) err } func (r *rpcClient) Subscribe(ctx context.Context, channel chan<- *evmtypes.Head, args ...interface{}) (commontypes.Subscription, error) { - ctx, cancel, ws, _ := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, _, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("args", args) lggr.Debug("RPC call: evmclient.Client#EthSubscribe") start := time.Now() - var sub commontypes.Subscription sub, err := ws.rpc.EthSubscribe(ctx, channel, args...) if err == nil { - sub = newSubscriptionErrorWrapper(sub, r.rpcClientErrorPrefix()) r.registerSub(sub) } duration := time.Since(start) r.logResult(lggr, err, duration, r.getRPCDomain(), "EthSubscribe") - return sub, r.wrapWS(err) + return sub, err } // GethClient wrappers @@ -365,14 +370,17 @@ func (r *rpcClient) TransactionReceipt(ctx context.Context, txHash common.Hash) return nil, err } if receipt == nil { - err = r.wrapRPCClientError(ethereum.NotFound) + err = ethereum.NotFound return } return } func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -395,7 +403,10 @@ func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Ha return } func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -419,7 +430,10 @@ func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) ( } func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header *types.Header, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("number", number) @@ -440,7 +454,10 @@ func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header } func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header *types.Header, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -477,7 +494,7 @@ func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evm return nil, err } if head == nil { - err = r.wrapRPCClientError(ethereum.NotFound) + err = ethereum.NotFound return } head.EVMChainID = ubig.New(r.chainID) @@ -490,7 +507,7 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev return nil, err } if head == nil { - err = r.wrapRPCClientError(ethereum.NotFound) + err = ethereum.NotFound return } head.EVMChainID = ubig.New(r.chainID) @@ -498,7 +515,10 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev } func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (block *types.Block, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -521,7 +541,10 @@ func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (bloc } func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (block *types.Block, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("number", number) @@ -544,13 +567,15 @@ func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo } func (r *rpcClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return err + } defer cancel() lggr := r.newRqLggr().With("tx", tx) lggr.Debug("RPC call: evmclient.Client#SendTransaction") start := time.Now() - var err error if http != nil { err = r.wrapHTTP(http.geth.SendTransaction(ctx, tx)) } else { @@ -582,7 +607,10 @@ func (r *rpcClient) SendEmptyTransaction( // PendingSequenceAt returns one higher than the highest nonce from both mempool and mined transactions func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Address) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() lggr := r.newRqLggr().With("account", account) @@ -611,7 +639,10 @@ func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Addres // mined nonce at the given block number, but it actually returns the total // transaction count which is the highest mined nonce + 1 func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -637,7 +668,10 @@ func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, bloc } func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("account", account) @@ -660,7 +694,10 @@ func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) ( } func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) (code []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -683,7 +720,10 @@ func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNum } func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() call := c.(ethereum.CallMsg) lggr := r.newRqLggr().With("call", call) @@ -707,7 +747,10 @@ func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, } func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr() @@ -730,7 +773,10 @@ func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err er } func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumber *big.Int) (val []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("callMsg", msg, "blockNumber", blockNumber) message := msg.(ethereum.CallMsg) @@ -758,7 +804,10 @@ func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumb } func (r *rpcClient) PendingCallContract(ctx context.Context, msg interface{}) (val []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("callMsg", msg) message := msg.(ethereum.CallMsg) @@ -792,7 +841,10 @@ func (r *rpcClient) LatestBlockHeight(ctx context.Context) (*big.Int, error) { } func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() lggr := r.newRqLggr() @@ -815,7 +867,10 @@ func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) } func (r *rpcClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (balance *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("account", account.Hex(), "blockNumber", blockNumber) @@ -852,7 +907,7 @@ func (r *rpcClient) TokenBalance(ctx context.Context, address common.Address, co return numLinkBigInt, err } if _, ok := numLinkBigInt.SetString(result, 0); !ok { - return nil, r.wrapRPCClientError(fmt.Errorf("failed to parse int: %s", result)) + return nil, fmt.Errorf("failed to parse int: %s", result) } return numLinkBigInt, nil } @@ -871,7 +926,10 @@ func (r *rpcClient) FilterEvents(ctx context.Context, q ethereum.FilterQuery) ([ } func (r *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []types.Log, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("q", q) @@ -899,7 +957,10 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro } func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error) { - ctx, cancel, ws, _ := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, _, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("q", q) @@ -907,7 +968,6 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu start := time.Now() sub, err = ws.geth.SubscribeFilterLogs(ctx, q, ch) if err == nil { - sub = newSubscriptionErrorWrapper(sub, r.rpcClientErrorPrefix()) r.registerSub(sub) } err = r.wrapWS(err) @@ -919,7 +979,10 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu } func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr() @@ -944,7 +1007,7 @@ func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err // Returns the ChainID according to the geth client. This is useful for functions like verify() // the common node. func (r *rpcClient) ChainID(ctx context.Context) (chainID *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() @@ -963,15 +1026,6 @@ func (r *rpcClient) newRqLggr() logger.SugaredLogger { return r.rpcLog.With("requestID", uuid.New()) } -func (r *rpcClient) wrapRPCClientError(err error) error { - // simple add msg to the error without adding new stack trace - return pkgerrors.WithMessage(err, r.rpcClientErrorPrefix()) -} - -func (r *rpcClient) rpcClientErrorPrefix() string { - return fmt.Sprintf("RPCClient returned error (%s)", r.name) -} - func wrapCallError(err error, tp string) error { if err == nil { return nil @@ -984,12 +1038,11 @@ func wrapCallError(err error, tp string) error { func (r *rpcClient) wrapWS(err error) error { err = wrapCallError(err, fmt.Sprintf("%s websocket (%s)", r.tier.String(), r.ws.uri.Redacted())) - return r.wrapRPCClientError(err) + return err } func (r *rpcClient) wrapHTTP(err error) error { err = wrapCallError(err, fmt.Sprintf("%s http (%s)", r.tier.String(), r.http.uri.Redacted())) - err = r.wrapRPCClientError(err) if err != nil { r.rpcLog.Debugw("Call failed", "err", err) } else { @@ -999,7 +1052,7 @@ func (r *rpcClient) wrapHTTP(err error) error { } // makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx -func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { +func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient, err error) { // Need to wrap in mutex because state transition can cancel and replace the // context r.stateMu.RLock() @@ -1019,14 +1072,16 @@ func (r *rpcClient) makeQueryCtx(ctx context.Context) (context.Context, context. } func (r *rpcClient) IsSyncing(ctx context.Context) (bool, error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return false, err + } defer cancel() lggr := r.newRqLggr() lggr.Debug("RPC call: evmclient.Client#SyncProgress") var syncProgress *ethereum.SyncProgress start := time.Now() - var err error if http != nil { syncProgress, err = http.geth.SyncProgress(ctx) err = r.wrapHTTP(err) diff --git a/core/chains/evm/client/sub_error_wrapper.go b/core/chains/evm/client/sub_error_wrapper.go deleted file mode 100644 index 689991ce70f..00000000000 --- a/core/chains/evm/client/sub_error_wrapper.go +++ /dev/null @@ -1,77 +0,0 @@ -package client - -import ( - "fmt" - - commontypes "github.com/smartcontractkit/chainlink/v2/common/types" -) - -// subErrorWrapper - adds specified prefix to a subscription error -type subErrorWrapper struct { - sub commontypes.Subscription - errorPrefix string - - done chan struct{} - unSub chan struct{} - errorCh chan error -} - -func newSubscriptionErrorWrapper(sub commontypes.Subscription, errorPrefix string) *subErrorWrapper { - s := &subErrorWrapper{ - sub: sub, - errorPrefix: errorPrefix, - done: make(chan struct{}), - unSub: make(chan struct{}), - errorCh: make(chan error), - } - - go func() { - for { - select { - // sub.Err channel is closed by sub.Unsubscribe - case err, ok := <-sub.Err(): - if !ok { - // might only happen if someone terminated wrapped subscription - // in any case - do our best to release resources - // we can't call Unsubscribe on root sub as this might cause panic - close(s.errorCh) - close(s.done) - return - } - - select { - case s.errorCh <- fmt.Errorf("%s: %w", s.errorPrefix, err): - case <-s.unSub: - s.close() - return - } - case <-s.unSub: - s.close() - return - } - } - }() - - return s -} - -func (s *subErrorWrapper) close() { - s.sub.Unsubscribe() - close(s.errorCh) - close(s.done) -} - -func (s *subErrorWrapper) Unsubscribe() { - select { - // already unsubscribed - case <-s.done: - // signal unsubscribe - case s.unSub <- struct{}{}: - // wait for unsubscribe to complete - <-s.done - } -} - -func (s *subErrorWrapper) Err() <-chan error { - return s.errorCh -} diff --git a/core/chains/evm/client/sub_error_wrapper_test.go b/core/chains/evm/client/sub_error_wrapper_test.go deleted file mode 100644 index 457d392a50e..00000000000 --- a/core/chains/evm/client/sub_error_wrapper_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package client - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" -) - -func TestSubscriptionErrorWrapper(t *testing.T) { - t.Parallel() - t.Run("Unsubscribe wrapper releases resources", func(t *testing.T) { - t.Parallel() - - mockedSub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(mockedSub, prefix) - wrapper.Unsubscribe() - - // mock's resources were relased - assert.True(t, mockedSub.unsubscribed) - _, ok := <-mockedSub.Err() - assert.False(t, ok) - // wrapper's channels are closed - _, ok = <-wrapper.Err() - assert.False(t, ok) - // subsequence unsubscribe does not causes panic - wrapper.Unsubscribe() - }) - t.Run("Unsubscribe interrupts error delivery", func(t *testing.T) { - t.Parallel() - sub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(sub, prefix) - sub.Errors <- fmt.Errorf("error") - - wrapper.Unsubscribe() - _, ok := <-wrapper.Err() - assert.False(t, ok) - }) - t.Run("Successfully wraps error", func(t *testing.T) { - t.Parallel() - sub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(sub, prefix) - sub.Errors <- fmt.Errorf("root error") - - err, ok := <-wrapper.Err() - assert.True(t, ok) - assert.Equal(t, "RPC returned error: root error", err.Error()) - - wrapper.Unsubscribe() - _, ok = <-wrapper.Err() - assert.False(t, ok) - }) - t.Run("Unsubscribe on root does not cause panic", func(t *testing.T) { - t.Parallel() - mockedSub := NewMockSubscription() - wrapper := newSubscriptionErrorWrapper(mockedSub, "") - - mockedSub.Unsubscribe() - // mock's resources were released - assert.True(t, mockedSub.unsubscribed) - _, ok := <-mockedSub.Err() - assert.False(t, ok) - // wrapper's channels are eventually closed - tests.AssertEventually(t, func() bool { - _, ok = <-wrapper.Err() - return !ok - }) - - }) -} diff --git a/core/chains/evm/gas/block_history_estimator.go b/core/chains/evm/gas/block_history_estimator.go index 0ae067e45bf..8b8c626f725 100644 --- a/core/chains/evm/gas/block_history_estimator.go +++ b/core/chains/evm/gas/block_history_estimator.go @@ -721,7 +721,7 @@ func (b *BlockHistoryEstimator) batchFetch(ctx context.Context, reqs []rpc.Batch err := b.ethClient.BatchCallContext(ctx, reqs[i:j]) if pkgerrors.Is(err, context.DeadlineExceeded) { // We ran out of time, return what we have - b.logger.Warnf("Batch fetching timed out; loaded %d/%d results: %v", i, len(reqs), err) + b.logger.Warnf("Batch fetching timed out; loaded %d/%d results", i, len(reqs)) for k := i; k < len(reqs); k++ { if k < j { reqs[k].Error = pkgerrors.Wrap(err, "request failed")