From 540d54b2dc1708c7e75131b5ae920c22d72c8177 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:05:57 -0700 Subject: [PATCH 1/8] Revert "Create HeadPoller for Multi-Node (#12871)" This reverts commit 7338448469d014b4b1eb5aaedf0303f687e45f62. --- common/client/poller.go | 98 ----------------- common/client/poller_test.go | 207 ----------------------------------- 2 files changed, 305 deletions(-) delete mode 100644 common/client/poller.go delete mode 100644 common/client/poller_test.go diff --git a/common/client/poller.go b/common/client/poller.go deleted file mode 100644 index b21f28fe604..00000000000 --- a/common/client/poller.go +++ /dev/null @@ -1,98 +0,0 @@ -package client - -import ( - "context" - "sync" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/services" - - "github.com/smartcontractkit/chainlink/v2/common/types" -) - -// Poller is a component that polls a function at a given interval -// and delivers the result to a channel. It is used by multinode to poll -// for new heads and implements the Subscription interface. -type Poller[T any] struct { - services.StateMachine - pollingInterval time.Duration - pollingFunc func(ctx context.Context) (T, error) - pollingTimeout time.Duration - logger logger.Logger - channel chan<- T - errCh chan error - - stopCh services.StopChan - wg sync.WaitGroup -} - -// NewPoller creates a new Poller instance -func NewPoller[ - T any, -](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, channel chan<- T, logger logger.Logger) Poller[T] { - return Poller[T]{ - pollingInterval: pollingInterval, - pollingFunc: pollingFunc, - pollingTimeout: pollingTimeout, - channel: channel, - logger: logger, - errCh: make(chan error), - stopCh: make(chan struct{}), - } -} - -var _ types.Subscription = &Poller[any]{} - -func (p *Poller[T]) Start() error { - return p.StartOnce("Poller", func() error { - p.wg.Add(1) - go p.pollingLoop() - return nil - }) -} - -// Unsubscribe cancels the sending of events to the data channel -func (p *Poller[T]) Unsubscribe() { - _ = p.StopOnce("Poller", func() error { - close(p.stopCh) - p.wg.Wait() - close(p.errCh) - return nil - }) -} - -func (p *Poller[T]) Err() <-chan error { - return p.errCh -} - -func (p *Poller[T]) pollingLoop() { - defer p.wg.Done() - - ticker := time.NewTicker(p.pollingInterval) - defer ticker.Stop() - - for { - select { - case <-p.stopCh: - return - case <-ticker.C: - // Set polling timeout - pollingCtx, cancelPolling := context.WithTimeout(context.Background(), p.pollingTimeout) - p.stopCh.CtxCancel(pollingCtx, cancelPolling) - // Execute polling function - result, err := p.pollingFunc(pollingCtx) - cancelPolling() - if err != nil { - p.logger.Warnf("polling error: %v", err) - continue - } - // Send result to channel or block if channel is full - select { - case p.channel <- result: - case <-p.stopCh: - return - } - } - } -} diff --git a/common/client/poller_test.go b/common/client/poller_test.go deleted file mode 100644 index 3f11c759adb..00000000000 --- a/common/client/poller_test.go +++ /dev/null @@ -1,207 +0,0 @@ -package client - -import ( - "context" - "fmt" - "math/big" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" -) - -func Test_Poller(t *testing.T) { - lggr := logger.Test(t) - - t.Run("Test multiple start", func(t *testing.T) { - pollFunc := func(ctx context.Context) (Head, error) { - return nil, nil - } - - channel := make(chan Head, 1) - defer close(channel) - - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - err := poller.Start() - require.NoError(t, err) - - err = poller.Start() - require.Error(t, err) - poller.Unsubscribe() - }) - - t.Run("Test polling for heads", func(t *testing.T) { - // Mock polling function that returns a new value every time it's called - var pollNumber int - pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context) (Head, error) { - pollLock.Lock() - defer pollLock.Unlock() - pollNumber++ - h := head{ - BlockNumber: int64(pollNumber), - BlockDifficulty: big.NewInt(int64(pollNumber)), - } - return h.ToMockHead(t), nil - } - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - // Create poller and start to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Receive updates from the poller - pollCount := 0 - pollMax := 50 - for ; pollCount < pollMax; pollCount++ { - h := <-channel - assert.Equal(t, int64(pollCount+1), h.BlockNumber()) - } - }) - - t.Run("Test polling errors", func(t *testing.T) { - // Mock polling function that returns an error - var pollNumber int - pollLock := sync.Mutex{} - pollFunc := func(ctx context.Context) (Head, error) { - pollLock.Lock() - defer pollLock.Unlock() - pollNumber++ - return nil, fmt.Errorf("polling error %d", pollNumber) - } - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - - // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, olggr) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Ensure that all errors were logged as expected - logsSeen := func() bool { - for pollCount := 0; pollCount < 50; pollCount++ { - numLogs := observedLogs.FilterMessage(fmt.Sprintf("polling error: polling error %d", pollCount+1)).Len() - if numLogs != 1 { - return false - } - } - return true - } - require.Eventually(t, logsSeen, time.Second, time.Millisecond) - }) - - t.Run("Test polling timeout", func(t *testing.T) { - pollFunc := func(ctx context.Context) (Head, error) { - if <-ctx.Done(); true { - return nil, ctx.Err() - } - return nil, nil - } - - // Set instant timeout - pollingTimeout := time.Duration(0) - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - - // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr) - require.NoError(t, poller.Start()) - defer poller.Unsubscribe() - - // Ensure that timeout errors were logged as expected - logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: context deadline exceeded").Len() >= 1 - } - require.Eventually(t, logsSeen, time.Second, time.Millisecond) - }) - - t.Run("Test unsubscribe during polling", func(t *testing.T) { - wait := make(chan struct{}) - pollFunc := func(ctx context.Context) (Head, error) { - close(wait) - // Block in polling function until context is cancelled - if <-ctx.Done(); true { - return nil, ctx.Err() - } - return nil, nil - } - - // Set long timeout - pollingTimeout := time.Minute - - // data channel to receive updates from the poller - channel := make(chan Head, 1) - defer close(channel) - - olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - - // Create poller and subscribe to receive data - poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr) - require.NoError(t, poller.Start()) - - // Unsubscribe while blocked in polling function - <-wait - poller.Unsubscribe() - - // Ensure error was logged - logsSeen := func() bool { - return observedLogs.FilterMessage("polling error: context canceled").Len() >= 1 - } - require.Eventually(t, logsSeen, time.Second, time.Millisecond) - }) -} - -func Test_Poller_Unsubscribe(t *testing.T) { - lggr := logger.Test(t) - pollFunc := func(ctx context.Context) (Head, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - h := head{ - BlockNumber: 0, - BlockDifficulty: big.NewInt(0), - } - return h.ToMockHead(t), nil - } - } - - t.Run("Test multiple unsubscribe", func(t *testing.T) { - channel := make(chan Head, 1) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - err := poller.Start() - require.NoError(t, err) - - <-channel - poller.Unsubscribe() - poller.Unsubscribe() - }) - - t.Run("Test unsubscribe with closed channel", func(t *testing.T) { - channel := make(chan Head, 1) - poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr) - err := poller.Start() - require.NoError(t, err) - - <-channel - close(channel) - poller.Unsubscribe() - }) -} From 386e26cd0b9f4992d3cb511cb9fb39a129ccd8ac Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:07:50 -0700 Subject: [PATCH 2/8] Revert "Update log trigger log provider readMaxBatchSize to 56 (#12951)" This reverts commit c98ea6413dcdc02a7d0c82b9b36d3fce97dac94b. --- .changeset/tidy-trees-tie.md | 5 ----- .../ocr2keeper/evmregistry/v21/logprovider/provider.go | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) delete mode 100644 .changeset/tidy-trees-tie.md diff --git a/.changeset/tidy-trees-tie.md b/.changeset/tidy-trees-tie.md deleted file mode 100644 index 7ff415e9de4..00000000000 --- a/.changeset/tidy-trees-tie.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chainlink": patch ---- - -#changed Updating the log trigger log provider's readMaxBatchSize to 56 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index e2c1a1531e2..b07b08d3354 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -42,7 +42,7 @@ var ( readJobQueueSize = 64 readLogsTimeout = 10 * time.Second - readMaxBatchSize = 56 + readMaxBatchSize = 32 // reorgBuffer is the number of blocks to add as a buffer to the block range when reading logs. reorgBuffer = int64(32) readerThreads = 4 From ba89864f3e2db46fb4de88c5c690d9e461c94e7c Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:08:31 -0700 Subject: [PATCH 3/8] Revert "Wrap RPC errors (#12638)" This reverts commit bcf76534862b32503f4192e38b7e1cb4dd7e312d. --- .changeset/quick-fishes-heal.md | 5 - common/client/models.go | 29 ---- common/client/models_test.go | 16 -- common/client/multi_node.go | 13 +- common/client/multi_node_test.go | 20 +-- core/chains/evm/client/rpc_client.go | 157 ++++++++++++------ core/chains/evm/client/sub_error_wrapper.go | 77 --------- .../evm/client/sub_error_wrapper_test.go | 75 --------- .../chains/evm/gas/block_history_estimator.go | 2 +- 9 files changed, 119 insertions(+), 275 deletions(-) delete mode 100644 .changeset/quick-fishes-heal.md delete mode 100644 common/client/models_test.go delete mode 100644 core/chains/evm/client/sub_error_wrapper.go delete mode 100644 core/chains/evm/client/sub_error_wrapper_test.go diff --git a/.changeset/quick-fishes-heal.md b/.changeset/quick-fishes-heal.md deleted file mode 100644 index 966e74c843a..00000000000 --- a/.changeset/quick-fishes-heal.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chainlink": patch ---- -#changed -Added prefix `RPCClient returned error ({RPC_NAME})` to RPC errors to simplify filtering of RPC related issues. diff --git a/common/client/models.go b/common/client/models.go index fd0c3915940..66f1e9cf88b 100644 --- a/common/client/models.go +++ b/common/client/models.go @@ -28,35 +28,6 @@ var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, Exc // sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown} -func (c SendTxReturnCode) String() string { - switch c { - case Successful: - return "Successful" - case Fatal: - return "Fatal" - case Retryable: - return "Retryable" - case Underpriced: - return "Underpriced" - case Unknown: - return "Unknown" - case Unsupported: - return "Unsupported" - case TransactionAlreadyKnown: - return "TransactionAlreadyKnown" - case InsufficientFunds: - return "InsufficientFunds" - case ExceedsMaxFee: - return "ExceedsMaxFee" - case FeeOutOfValidRange: - return "FeeOutOfValidRange" - case OutOfCounters: - return "OutOfCounters" - default: - return fmt.Sprintf("SendTxReturnCode(%d)", c) - } -} - type NodeTier int const ( diff --git a/common/client/models_test.go b/common/client/models_test.go deleted file mode 100644 index 2d5dc31b373..00000000000 --- a/common/client/models_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package client - -import ( - "strings" - "testing" -) - -func TestSendTxReturnCode_String(t *testing.T) { - // ensure all the SendTxReturnCodes have proper name - for c := 1; c < int(sendTxReturnCodeLen); c++ { - strC := SendTxReturnCode(c).String() - if strings.Contains(strC, "SendTxReturnCode(") { - t.Errorf("Expected %s to have a proper string representation", strC) - } - } -} diff --git a/common/client/multi_node.go b/common/client/multi_node.go index fa413df91aa..cc8daed599c 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -561,13 +561,6 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return n.RPC().PendingSequenceAt(ctx, addr) } -type sendTxErrors map[SendTxReturnCode][]error - -// String - returns string representation of the errors map. Required by logger to properly represent the value -func (errs sendTxErrors) String() string { - return fmt.Sprint(map[SendTxReturnCode][]error(errs)) -} - func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) SendEmptyTransaction( ctx context.Context, newTxAttempt func(seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt any, err error), @@ -609,7 +602,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP ctx, cancel := c.chStop.Ctx(ctx) defer cancel() requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) - errorsByCode := sendTxErrors{} + errorsByCode := map[SendTxReturnCode][]error{} var softTimeoutChan <-chan time.Time var resultsCount int loop: @@ -646,7 +639,7 @@ loop: func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { defer c.wg.Done() - resultsByCode := sendTxErrors{} + resultsByCode := map[SendTxReturnCode][]error{} // txResults eventually will be closed for txResult := range txResults { resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) @@ -660,7 +653,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } } -func aggregateTxResults(resultsByCode sendTxErrors) (txResult error, err error) { +func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) { severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) if hasSuccess { diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 9f6904fcaf2..9c09bd57d70 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -796,13 +796,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name string ExpectedTxResult string ExpectedCriticalErr string - ResultsByCode sendTxErrors + ResultsByCode map[SendTxReturnCode][]error }{ { Name: "Returns success and logs critical error on success and Fatal", ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Successful: {errors.New("success")}, Fatal: {errors.New("fatal")}, }, @@ -811,7 +811,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ TransactionAlreadyKnown: {errors.New("tx_already_known")}, Unsupported: {errors.New("unsupported")}, }, @@ -820,7 +820,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Prefers sever error to temporary", ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Retryable: {errors.New("retryable")}, Underpriced: {errors.New("underpriced")}, }, @@ -829,7 +829,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Returns temporary error", ExpectedTxResult: "retryable", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Retryable: {errors.New("retryable")}, }, }, @@ -837,7 +837,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Insufficient funds is treated as error", ExpectedTxResult: "", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ Successful: {nil}, InsufficientFunds: {errors.New("insufficientFunds")}, }, @@ -846,13 +846,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Logs critical error on empty ResultsByCode", ExpectedTxResult: "expected at least one response on SendTransaction", ExpectedCriticalErr: "expected at least one response on SendTransaction", - ResultsByCode: sendTxErrors{}, + ResultsByCode: map[SendTxReturnCode][]error{}, }, { Name: "Zk out of counter error", ExpectedTxResult: "not enough keccak counters to continue the execution", ExpectedCriticalErr: "", - ResultsByCode: sendTxErrors{ + ResultsByCode: map[SendTxReturnCode][]error{ OutOfCounters: {errors.New("not enough keccak counters to continue the execution")}, }, }, @@ -870,9 +870,6 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { assert.EqualError(t, txResult, testCase.ExpectedTxResult) } - logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) - logger.Sugared(logger.Test(t)).Criticalw("observed invariant violation on SendTransaction", "resultsByCode", testCase.ResultsByCode, "err", err) - if testCase.ExpectedCriticalErr == "" { assert.NoError(t, err) } else { @@ -887,4 +884,5 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { delete(codesToCover, codeToIgnore) } assert.Empty(t, codesToCover, "all of the SendTxReturnCode must be covered by this test") + } diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 548acf3206c..255b038037a 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -130,7 +130,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") if err != nil { promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) + return pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()) } r.ws.rpc = wsrpc @@ -159,7 +159,7 @@ func (r *rpcClient) DialHTTP() error { httprpc, err := rpc.DialHTTP(r.http.uri.String()) if err != nil { promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing HTTP: %v", r.http.uri.Redacted())) + return pkgerrors.Wrapf(err, "error while dialing HTTP: %v", r.http.uri.Redacted()) } r.http.rpc = httprpc @@ -295,7 +295,10 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() { // CallContext implementation func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return err + } defer cancel() lggr := r.newRqLggr().With( "method", method, @@ -304,7 +307,6 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method lggr.Debug("RPC call: evmclient.Client#CallContext") start := time.Now() - var err error if http != nil { err = r.wrapHTTP(http.rpc.CallContext(ctx, result, method, args...)) } else { @@ -318,13 +320,15 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method } func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return err + } defer cancel() lggr := r.newRqLggr().With("nBatchElems", len(b), "batchElems", b) lggr.Trace("RPC call: evmclient.Client#BatchCallContext") start := time.Now() - var err error if http != nil { err = r.wrapHTTP(http.rpc.BatchCallContext(ctx, b)) } else { @@ -338,23 +342,24 @@ func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) err } func (r *rpcClient) Subscribe(ctx context.Context, channel chan<- *evmtypes.Head, args ...interface{}) (commontypes.Subscription, error) { - ctx, cancel, ws, _ := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, _, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("args", args) lggr.Debug("RPC call: evmclient.Client#EthSubscribe") start := time.Now() - var sub commontypes.Subscription sub, err := ws.rpc.EthSubscribe(ctx, channel, args...) if err == nil { - sub = newSubscriptionErrorWrapper(sub, r.rpcClientErrorPrefix()) r.registerSub(sub) } duration := time.Since(start) r.logResult(lggr, err, duration, r.getRPCDomain(), "EthSubscribe") - return sub, r.wrapWS(err) + return sub, err } // GethClient wrappers @@ -365,14 +370,17 @@ func (r *rpcClient) TransactionReceipt(ctx context.Context, txHash common.Hash) return nil, err } if receipt == nil { - err = r.wrapRPCClientError(ethereum.NotFound) + err = ethereum.NotFound return } return } func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -395,7 +403,10 @@ func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Ha return } func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -419,7 +430,10 @@ func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) ( } func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header *types.Header, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("number", number) @@ -440,7 +454,10 @@ func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header } func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header *types.Header, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -477,7 +494,7 @@ func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evm return nil, err } if head == nil { - err = r.wrapRPCClientError(ethereum.NotFound) + err = ethereum.NotFound return } head.EVMChainID = ubig.New(r.chainID) @@ -490,7 +507,7 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev return nil, err } if head == nil { - err = r.wrapRPCClientError(ethereum.NotFound) + err = ethereum.NotFound return } head.EVMChainID = ubig.New(r.chainID) @@ -498,7 +515,10 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev } func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (block *types.Block, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -521,7 +541,10 @@ func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (bloc } func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (block *types.Block, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("number", number) @@ -544,13 +567,15 @@ func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo } func (r *rpcClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return err + } defer cancel() lggr := r.newRqLggr().With("tx", tx) lggr.Debug("RPC call: evmclient.Client#SendTransaction") start := time.Now() - var err error if http != nil { err = r.wrapHTTP(http.geth.SendTransaction(ctx, tx)) } else { @@ -582,7 +607,10 @@ func (r *rpcClient) SendEmptyTransaction( // PendingSequenceAt returns one higher than the highest nonce from both mempool and mined transactions func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Address) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() lggr := r.newRqLggr().With("account", account) @@ -611,7 +639,10 @@ func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Addres // mined nonce at the given block number, but it actually returns the total // transaction count which is the highest mined nonce + 1 func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -637,7 +668,10 @@ func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, bloc } func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("account", account) @@ -660,7 +694,10 @@ func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) ( } func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) (code []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -683,7 +720,10 @@ func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNum } func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() call := c.(ethereum.CallMsg) lggr := r.newRqLggr().With("call", call) @@ -707,7 +747,10 @@ func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, } func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr() @@ -730,7 +773,10 @@ func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err er } func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumber *big.Int) (val []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("callMsg", msg, "blockNumber", blockNumber) message := msg.(ethereum.CallMsg) @@ -758,7 +804,10 @@ func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumb } func (r *rpcClient) PendingCallContract(ctx context.Context, msg interface{}) (val []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("callMsg", msg) message := msg.(ethereum.CallMsg) @@ -792,7 +841,10 @@ func (r *rpcClient) LatestBlockHeight(ctx context.Context) (*big.Int, error) { } func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return 0, err + } defer cancel() lggr := r.newRqLggr() @@ -815,7 +867,10 @@ func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) } func (r *rpcClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (balance *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("account", account.Hex(), "blockNumber", blockNumber) @@ -852,7 +907,7 @@ func (r *rpcClient) TokenBalance(ctx context.Context, address common.Address, co return numLinkBigInt, err } if _, ok := numLinkBigInt.SetString(result, 0); !ok { - return nil, r.wrapRPCClientError(fmt.Errorf("failed to parse int: %s", result)) + return nil, fmt.Errorf("failed to parse int: %s", result) } return numLinkBigInt, nil } @@ -871,7 +926,10 @@ func (r *rpcClient) FilterEvents(ctx context.Context, q ethereum.FilterQuery) ([ } func (r *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []types.Log, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("q", q) @@ -899,7 +957,10 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro } func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error) { - ctx, cancel, ws, _ := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, _, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr().With("q", q) @@ -907,7 +968,6 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu start := time.Now() sub, err = ws.geth.SubscribeFilterLogs(ctx, q, ch) if err == nil { - sub = newSubscriptionErrorWrapper(sub, r.rpcClientErrorPrefix()) r.registerSub(sub) } err = r.wrapWS(err) @@ -919,7 +979,10 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu } func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return nil, err + } defer cancel() lggr := r.newRqLggr() @@ -944,7 +1007,7 @@ func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err // Returns the ChainID according to the geth client. This is useful for functions like verify() // the common node. func (r *rpcClient) ChainID(ctx context.Context) (chainID *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() @@ -963,15 +1026,6 @@ func (r *rpcClient) newRqLggr() logger.SugaredLogger { return r.rpcLog.With("requestID", uuid.New()) } -func (r *rpcClient) wrapRPCClientError(err error) error { - // simple add msg to the error without adding new stack trace - return pkgerrors.WithMessage(err, r.rpcClientErrorPrefix()) -} - -func (r *rpcClient) rpcClientErrorPrefix() string { - return fmt.Sprintf("RPCClient returned error (%s)", r.name) -} - func wrapCallError(err error, tp string) error { if err == nil { return nil @@ -984,12 +1038,11 @@ func wrapCallError(err error, tp string) error { func (r *rpcClient) wrapWS(err error) error { err = wrapCallError(err, fmt.Sprintf("%s websocket (%s)", r.tier.String(), r.ws.uri.Redacted())) - return r.wrapRPCClientError(err) + return err } func (r *rpcClient) wrapHTTP(err error) error { err = wrapCallError(err, fmt.Sprintf("%s http (%s)", r.tier.String(), r.http.uri.Redacted())) - err = r.wrapRPCClientError(err) if err != nil { r.rpcLog.Debugw("Call failed", "err", err) } else { @@ -999,7 +1052,7 @@ func (r *rpcClient) wrapHTTP(err error) error { } // makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx -func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { +func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient, err error) { // Need to wrap in mutex because state transition can cancel and replace the // context r.stateMu.RLock() @@ -1019,14 +1072,16 @@ func (r *rpcClient) makeQueryCtx(ctx context.Context) (context.Context, context. } func (r *rpcClient) IsSyncing(ctx context.Context) (bool, error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + if err != nil { + return false, err + } defer cancel() lggr := r.newRqLggr() lggr.Debug("RPC call: evmclient.Client#SyncProgress") var syncProgress *ethereum.SyncProgress start := time.Now() - var err error if http != nil { syncProgress, err = http.geth.SyncProgress(ctx) err = r.wrapHTTP(err) diff --git a/core/chains/evm/client/sub_error_wrapper.go b/core/chains/evm/client/sub_error_wrapper.go deleted file mode 100644 index 689991ce70f..00000000000 --- a/core/chains/evm/client/sub_error_wrapper.go +++ /dev/null @@ -1,77 +0,0 @@ -package client - -import ( - "fmt" - - commontypes "github.com/smartcontractkit/chainlink/v2/common/types" -) - -// subErrorWrapper - adds specified prefix to a subscription error -type subErrorWrapper struct { - sub commontypes.Subscription - errorPrefix string - - done chan struct{} - unSub chan struct{} - errorCh chan error -} - -func newSubscriptionErrorWrapper(sub commontypes.Subscription, errorPrefix string) *subErrorWrapper { - s := &subErrorWrapper{ - sub: sub, - errorPrefix: errorPrefix, - done: make(chan struct{}), - unSub: make(chan struct{}), - errorCh: make(chan error), - } - - go func() { - for { - select { - // sub.Err channel is closed by sub.Unsubscribe - case err, ok := <-sub.Err(): - if !ok { - // might only happen if someone terminated wrapped subscription - // in any case - do our best to release resources - // we can't call Unsubscribe on root sub as this might cause panic - close(s.errorCh) - close(s.done) - return - } - - select { - case s.errorCh <- fmt.Errorf("%s: %w", s.errorPrefix, err): - case <-s.unSub: - s.close() - return - } - case <-s.unSub: - s.close() - return - } - } - }() - - return s -} - -func (s *subErrorWrapper) close() { - s.sub.Unsubscribe() - close(s.errorCh) - close(s.done) -} - -func (s *subErrorWrapper) Unsubscribe() { - select { - // already unsubscribed - case <-s.done: - // signal unsubscribe - case s.unSub <- struct{}{}: - // wait for unsubscribe to complete - <-s.done - } -} - -func (s *subErrorWrapper) Err() <-chan error { - return s.errorCh -} diff --git a/core/chains/evm/client/sub_error_wrapper_test.go b/core/chains/evm/client/sub_error_wrapper_test.go deleted file mode 100644 index 457d392a50e..00000000000 --- a/core/chains/evm/client/sub_error_wrapper_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package client - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" -) - -func TestSubscriptionErrorWrapper(t *testing.T) { - t.Parallel() - t.Run("Unsubscribe wrapper releases resources", func(t *testing.T) { - t.Parallel() - - mockedSub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(mockedSub, prefix) - wrapper.Unsubscribe() - - // mock's resources were relased - assert.True(t, mockedSub.unsubscribed) - _, ok := <-mockedSub.Err() - assert.False(t, ok) - // wrapper's channels are closed - _, ok = <-wrapper.Err() - assert.False(t, ok) - // subsequence unsubscribe does not causes panic - wrapper.Unsubscribe() - }) - t.Run("Unsubscribe interrupts error delivery", func(t *testing.T) { - t.Parallel() - sub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(sub, prefix) - sub.Errors <- fmt.Errorf("error") - - wrapper.Unsubscribe() - _, ok := <-wrapper.Err() - assert.False(t, ok) - }) - t.Run("Successfully wraps error", func(t *testing.T) { - t.Parallel() - sub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(sub, prefix) - sub.Errors <- fmt.Errorf("root error") - - err, ok := <-wrapper.Err() - assert.True(t, ok) - assert.Equal(t, "RPC returned error: root error", err.Error()) - - wrapper.Unsubscribe() - _, ok = <-wrapper.Err() - assert.False(t, ok) - }) - t.Run("Unsubscribe on root does not cause panic", func(t *testing.T) { - t.Parallel() - mockedSub := NewMockSubscription() - wrapper := newSubscriptionErrorWrapper(mockedSub, "") - - mockedSub.Unsubscribe() - // mock's resources were released - assert.True(t, mockedSub.unsubscribed) - _, ok := <-mockedSub.Err() - assert.False(t, ok) - // wrapper's channels are eventually closed - tests.AssertEventually(t, func() bool { - _, ok = <-wrapper.Err() - return !ok - }) - - }) -} diff --git a/core/chains/evm/gas/block_history_estimator.go b/core/chains/evm/gas/block_history_estimator.go index 0ae067e45bf..8b8c626f725 100644 --- a/core/chains/evm/gas/block_history_estimator.go +++ b/core/chains/evm/gas/block_history_estimator.go @@ -721,7 +721,7 @@ func (b *BlockHistoryEstimator) batchFetch(ctx context.Context, reqs []rpc.Batch err := b.ethClient.BatchCallContext(ctx, reqs[i:j]) if pkgerrors.Is(err, context.DeadlineExceeded) { // We ran out of time, return what we have - b.logger.Warnf("Batch fetching timed out; loaded %d/%d results: %v", i, len(reqs), err) + b.logger.Warnf("Batch fetching timed out; loaded %d/%d results", i, len(reqs)) for k := i; k < len(reqs); k++ { if k < j { reqs[k].Error = pkgerrors.Wrap(err, "request failed") From ee9ea073b00c130fcdfe869b10332b9119bddcaa Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:09:08 -0700 Subject: [PATCH 4/8] Revert "update contracts README with changesets (#13002)" This reverts commit 0370aa94fbb236ce7ba268adf9a669831b0b40af. --- contracts/CHANGELOG.md | 4 ++-- contracts/README.md | 16 ---------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/contracts/CHANGELOG.md b/contracts/CHANGELOG.md index 3139312e325..667a5ad2529 100644 --- a/contracts/CHANGELOG.md +++ b/contracts/CHANGELOG.md @@ -1,6 +1,6 @@ -# @chainlink/contracts +# @chainlink/contracts CHANGELOG.md -## 1.1.0 - 2024-04-23 +## 1.1.0 ### Minor Changes diff --git a/contracts/README.md b/contracts/README.md index 26b0a823298..8df69057229 100644 --- a/contracts/README.md +++ b/contracts/README.md @@ -50,22 +50,6 @@ contribution information. Thank you! -### Changesets - -We use [changesets](https://github.com/changesets/changesets) to manage versioning the contracts. - -Every PR that modifies any configuration or code, should most likely accompanied by a changeset file. - -To install `changesets`: - 1. Install `pnpm` if it is not already installed - [docs](https://pnpm.io/installation). - 2. Run `pnpm install`. - -Either after or before you create a commit, run the `pnpm changeset` command in the `contracts` directory to create an accompanying changeset entry which will reflect on the CHANGELOG for the next release. - -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - ## License [MIT](https://choosealicense.com/licenses/mit/) From ae7884d5487814077eaf7e97ad9c2c29336d9bab Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:09:42 -0700 Subject: [PATCH 5/8] Revert "fix: prevent query syntax error if allowlist is empty (#12912)" This reverts commit b1c8d74272e3b02e0a2a954c3d61b65ecb42f5cf. --- .../services/gateway/handlers/functions/allowlist/orm.go | 5 ----- .../gateway/handlers/functions/allowlist/orm_test.go | 9 --------- 2 files changed, 14 deletions(-) diff --git a/core/services/gateway/handlers/functions/allowlist/orm.go b/core/services/gateway/handlers/functions/allowlist/orm.go index 20a8ed15252..7867c06d5d4 100644 --- a/core/services/gateway/handlers/functions/allowlist/orm.go +++ b/core/services/gateway/handlers/functions/allowlist/orm.go @@ -67,11 +67,6 @@ func (o *orm) GetAllowedSenders(ctx context.Context, offset, limit uint) ([]comm } func (o *orm) CreateAllowedSenders(ctx context.Context, allowedSenders []common.Address) error { - if len(allowedSenders) == 0 { - o.lggr.Debugf("empty allowed senders list: %v for routerContractAddress: %s. skipping...", allowedSenders, o.routerContractAddress) - return nil - } - var valuesPlaceholder []string for i := 1; i <= len(allowedSenders)*2; i += 2 { valuesPlaceholder = append(valuesPlaceholder, fmt.Sprintf("($%d, $%d)", i, i+1)) diff --git a/core/services/gateway/handlers/functions/allowlist/orm_test.go b/core/services/gateway/handlers/functions/allowlist/orm_test.go index 388d47a769b..2584e131968 100644 --- a/core/services/gateway/handlers/functions/allowlist/orm_test.go +++ b/core/services/gateway/handlers/functions/allowlist/orm_test.go @@ -128,15 +128,6 @@ func TestORM_CreateAllowedSenders(t *testing.T) { require.Equal(t, expected[0], results[0]) require.Equal(t, expected[1], results[1]) }) - - // this scenario can happen if the allowlist is empty but we call CreateAllowedSenders - t.Run("OK-empty_list", func(t *testing.T) { - ctx := testutils.Context(t) - orm, err := setupORM(t) - require.NoError(t, err) - err = orm.CreateAllowedSenders(ctx, []common.Address{}) - require.NoError(t, err) - }) } func TestORM_DeleteAllowedSenders(t *testing.T) { From 4601e946506214dbd0d9bf9ca861faf5b3bbe136 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:10:37 -0700 Subject: [PATCH 6/8] Revert "Fix Node Migration Test Check For Versions (#12982)" This reverts commit 71eed2133fb86a065461787e31db5fb1b18b05de. --- .../action.yml | 1 - .github/workflows/integration-tests.yml | 90 +++++++++++-------- 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/.github/actions/setup-create-base64-upgrade-config/action.yml b/.github/actions/setup-create-base64-upgrade-config/action.yml index 8f514784725..ed25fd6375f 100644 --- a/.github/actions/setup-create-base64-upgrade-config/action.yml +++ b/.github/actions/setup-create-base64-upgrade-config/action.yml @@ -92,7 +92,6 @@ runs: [ChainlinkUpgradeImage] image="$UPGRADE_IMAGE" version="$UPGRADE_VERSION" - postgres_version="$CHAINLINK_POSTGRES_VERSION" [Logging] test_log_collect=$test_log_collect diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 8dcf32b127e..1f3e093cfdc 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -218,6 +218,40 @@ jobs: AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} dep_evm_sha: ${{ inputs.evm-ref }} + build-test-image: + if: startsWith(github.ref, 'refs/tags/') || github.event_name == 'schedule' || contains(join(github.event.pull_request.labels.*.name, ' '), 'build-test-image') + environment: integration + permissions: + id-token: write + contents: read + name: Build Test Image + runs-on: ubuntu22.04-16cores-64GB + needs: [changes] + steps: + - name: Collect Metrics + if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch' + id: collect-gha-metrics + uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0 + with: + id: ${{ env.COLLECTION_ID }}-build-test-image + org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} + basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} + hostname: ${{ secrets.GRAFANA_INTERNAL_HOST }} + this-job-name: Build Test Image + continue-on-error: true + - name: Checkout the repo + uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 + with: + repository: smartcontractkit/chainlink + ref: ${{ inputs.cl_ref || github.event.pull_request.head.sha || github.event.merge_group.head_sha }} + - name: Build Test Image + if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch' + uses: ./.github/actions/build-test-image + with: + QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} + QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} + QA_AWS_ACCOUNT_NUMBER: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }} + compare-tests: needs: [changes] runs-on: ubuntu-latest @@ -692,7 +726,7 @@ jobs: cache_restore_only: "true" QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} - QA_KUBECONFIG: "" + QA_KUBECONFIG: ${{ secrets.QA_KUBECONFIG }} should_tidy: "false" - name: Show Otel-Collector Logs if: steps.check-label.outputs.trace == 'true' && matrix.product.name == 'ocr2' && matrix.product.tag_suffix == '-plugins' @@ -796,7 +830,6 @@ jobs: # Run the setup if the matrix finishes but this time save the cache if we have a cache hit miss # this will also only run if both of the matrix jobs pass eth-smoke-go-mod-cache: - environment: integration needs: [eth-smoke-tests] runs-on: ubuntu-latest @@ -830,7 +863,7 @@ jobs: id-token: write contents: read runs-on: ubuntu-latest - needs: [build-chainlink, changes] + needs: [build-chainlink, changes, build-test-image] # Only run migration tests on new tags if: startsWith(github.ref, 'refs/tags/') env: @@ -843,17 +876,6 @@ jobs: TEST_LOG_LEVEL: debug TEST_SUITE: migration steps: - - name: Collect Metrics - id: collect-gha-metrics - uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0 - with: - id: ${{ env.COLLECTION_ID }}-migration-tests - org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} - basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} - hostname: ${{ secrets.GRAFANA_INTERNAL_HOST }} - this-job-name: Version Migration Tests - test-results-file: '{"testType":"go","filePath":"/tmp/gotest.log"}' - continue-on-error: true - name: Checkout the repo uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: @@ -864,12 +886,7 @@ jobs: run: | untrimmed_ver=$(curl --header "Authorization: token ${{ secrets.GITHUB_TOKEN }}" --request GET https://api.github.com/repos/${{ github.repository }}/releases/latest | jq -r .name) latest_version="${untrimmed_ver:1}" - # Check if latest_version is empty - if [ -z "$latest_version" ]; then - echo "Error: The latest_version is empty. The migration tests need a verison to run." - exit 1 - fi - echo "latest_version=${latest_version}" >> "$GITHUB_OUTPUT" + echo "latest_version=${latest_version} | tee -a $GITHUB_OUTPUT" - name: Name Versions run: | echo "Running migration tests from version '${{ steps.get_latest_version.outputs.latest_version }}' to: '${{ inputs.evm-ref || github.sha }}'" @@ -881,22 +898,13 @@ jobs: chainlinkVersion: ${{ steps.get_latest_version.outputs.latest_version }} upgradeImage: ${{ env.UPGRADE_IMAGE }} upgradeVersion: ${{ env.UPGRADE_VERSION }} - runId: ${{ github.run_id }} - testLogCollect: ${{ vars.TEST_LOG_COLLECT }} - lokiEndpoint: https://${{ secrets.GRAFANA_INTERNAL_HOST }}/loki/api/v1/push - lokiTenantId: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} - lokiBasicAuth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} - logstreamLogTargets: ${{ vars.LOGSTREAM_LOG_TARGETS }} - grafanaUrl: ${{ vars.GRAFANA_URL }} - grafanaDashboardUrl: "/d/ddf75041-1e39-42af-aa46-361fe4c36e9e/ci-e2e-tests-logs" - name: Run Migration Tests uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/run-tests@519851800779323566b7b7c22cc21bff95dbb639 # v2.3.11 with: - test_command_to_run: cd ./integration-tests && go test -timeout 20m -count=1 -json ./migration 2>&1 | tee /tmp/gotest.log | gotestloghelper -ci -singlepackage + test_command_to_run: cd ./integration-tests && go test -timeout 30m -count=1 -json ./migration 2>&1 | tee /tmp/gotest.log | gotestloghelper -ci -singlepackage test_download_vendor_packages_command: cd ./integration-tests && go mod download cl_repo: ${{ env.CHAINLINK_IMAGE }} cl_image_tag: ${{ steps.get_latest_version.outputs.latest_version }} - aws_registries: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }} artifacts_name: node-migration-test-logs artifacts_location: | ./integration-tests/migration/logs @@ -908,24 +916,28 @@ jobs: cache_restore_only: "true" QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} - QA_KUBECONFIG: "" + QA_KUBECONFIG: ${{ secrets.QA_KUBECONFIG }} go_coverage_src_dir: /var/tmp/go-coverage go_coverage_dest_dir: ${{ github.workspace }}/.covdata - should_tidy: "false" - name: Upload Coverage Data uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v4.3.3 with: name: cl-node-coverage-data-migration-tests path: .covdata retention-days: 1 - - name: Notify Slack - if: failure() && github.event_name != 'workflow_dispatch' - uses: slackapi/slack-github-action@6c661ce58804a1a20f6dc5fbee7f0381b469e001 # v1.25.0 - env: - SLACK_BOT_TOKEN: ${{ secrets.QA_SLACK_API_KEY }} + + - name: Collect Metrics + if: always() + id: collect-gha-metrics + uses: smartcontractkit/push-gha-metrics-action@dea9b546553cb4ca936607c2267a09c004e4ab3f # v3.0.0 with: - channel-id: "#team-test-tooling-internal" - slack-message: ":x: :mild-panic-intensifies: Node Migration Tests Failed: ${{ job.html_url }}\n${{ format('https://github.com/smartcontractkit/chainlink/actions/runs/{0}', github.run_id) }}" + id: ${{ env.COLLECTION_ID }}-migration-tests + org-id: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }} + basic-auth: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }} + hostname: ${{ secrets.GRAFANA_INTERNAL_HOST }} + this-job-name: Version Migration Tests + test-results-file: '{"testType":"go","filePath":"/tmp/gotest.log"}' + continue-on-error: true ## Solana Section get_solana_sha: From 7fb6f7c556b583d0efbdcb41a7d91b70eab11114 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:11:16 -0700 Subject: [PATCH 7/8] Revert "LogPoller CLI command to resolve reorg greater than finality depth (#12867)" This reverts commit 27d941328655e0cde608c1eff47de736c11e2e58. --- .changeset/brave-dots-breathe.md | 7 -- core/chains/evm/logpoller/disabled.go | 8 -- core/chains/evm/logpoller/log_poller.go | 99 --------------- core/chains/evm/logpoller/log_poller_test.go | 116 ------------------ core/chains/evm/logpoller/mocks/log_poller.go | 48 -------- core/chains/evm/logpoller/observability.go | 6 - core/chains/evm/logpoller/orm.go | 9 -- core/chains/evm/logpoller/orm_test.go | 30 ----- core/cmd/blocks_commands.go | 58 --------- core/cmd/blocks_commands_test.go | 25 ---- core/cmd/shell_local.go | 79 ------------ core/cmd/shell_local_test.go | 56 --------- core/internal/mocks/application.go | 50 -------- core/services/chainlink/application.go | 43 ------- core/web/api.go | 2 +- core/web/lca_controller.go | 74 ----------- core/web/lca_controller_test.go | 29 ----- core/web/router.go | 2 - testdata/scripts/blocks/help.txtar | 3 +- testdata/scripts/help-all/help-all.txtar | 2 - testdata/scripts/node/help.txtar | 1 - 21 files changed, 2 insertions(+), 745 deletions(-) delete mode 100644 .changeset/brave-dots-breathe.md delete mode 100644 core/web/lca_controller.go delete mode 100644 core/web/lca_controller_test.go diff --git a/.changeset/brave-dots-breathe.md b/.changeset/brave-dots-breathe.md deleted file mode 100644 index f1ae4f4d21e..00000000000 --- a/.changeset/brave-dots-breathe.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -"chainlink": minor ---- - -Added a new CLI command, `blocks find-lca,` which finds the latest block that is available in both the database and on the chain for the specified chain. -Added a new CLI command, `node remove-blocks,` which removes all blocks and logs greater than or equal to the specified block number. -#nops #added diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index 6f95b9c55da..f3e64378384 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -114,11 +114,3 @@ func (d disabled) LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, from func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) { return nil, ErrDisabled } - -func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) { - return nil, ErrDisabled -} - -func (d disabled) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { - return ErrDisabled -} diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index cd26889627f..7592ec104c4 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -44,8 +44,6 @@ type LogPoller interface { GetFilters() map[string]Filter LatestBlock(ctx context.Context) (LogPollerBlock, error) GetBlocksRange(ctx context.Context, numbers []uint64) ([]LogPollerBlock, error) - FindLCA(ctx context.Context) (*LogPollerBlock, error) - DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // General querying Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) @@ -1424,103 +1422,6 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(ctx context.Context, address c return lp.orm.SelectIndexedLogsWithSigsExcluding(ctx, eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs) } -// DeleteLogsAndBlocksAfter - removes blocks and logs starting from the specified block -func (lp *logPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { - return lp.orm.DeleteLogsAndBlocksAfter(ctx, start) -} - -func (lp *logPoller) FindLCA(ctx context.Context) (*LogPollerBlock, error) { - latest, err := lp.orm.SelectLatestBlock(ctx) - if err != nil { - return nil, fmt.Errorf("failed to select the latest block: %w", err) - } - - oldest, err := lp.orm.SelectOldestBlock(ctx, 0) - if err != nil { - return nil, fmt.Errorf("failed to select the oldest block: %w", err) - } - - if latest == nil || oldest == nil { - return nil, fmt.Errorf("expected at least one block to be present in DB") - } - - lp.lggr.Debugf("Received request to find LCA. Searching in range [%d, %d]", oldest.BlockNumber, latest.BlockNumber) - - // Find the largest block number for which block hash stored in the DB matches one that we get from the RPC. - // `sort.Find` expects slice of following format s = [1, 0, -1] and returns smallest index i for which s[i] = 0. - // To utilise `sort.Find` we represent range of blocks as slice [latestBlock, latestBlock-1, ..., olderBlock+1, oldestBlock] - // and return 1 if DB block was reorged or 0 if it's still present on chain. - lcaI, found := sort.Find(int(latest.BlockNumber-oldest.BlockNumber)+1, func(i int) int { - const notFound = 1 - const found = 0 - // if there is an error - stop the search - if err != nil { - return notFound - } - - // canceled search - if ctx.Err() != nil { - err = fmt.Errorf("aborted, FindLCA request cancelled: %w", ctx.Err()) - return notFound - } - iBlockNumber := latest.BlockNumber - int64(i) - var dbBlock *LogPollerBlock - // Block with specified block number might not exist in the database, to address that we check closest child - // of the iBlockNumber. If the child is present on chain, it's safe to assume that iBlockNumber is present too - dbBlock, err = lp.orm.SelectOldestBlock(ctx, iBlockNumber) - if err != nil { - err = fmt.Errorf("failed to select block %d by number: %w", iBlockNumber, err) - return notFound - } - - if dbBlock == nil { - err = fmt.Errorf("expected block to exist with blockNumber >= %d as observed block with number %d", iBlockNumber, latest.BlockNumber) - return notFound - } - - lp.lggr.Debugf("Looking for matching block on chain blockNumber: %d blockHash: %s", - dbBlock.BlockNumber, dbBlock.BlockHash) - var chainBlock *evmtypes.Head - chainBlock, err = lp.ec.HeadByHash(ctx, dbBlock.BlockHash) - // our block in DB does not exist on chain - if (chainBlock == nil && err == nil) || errors.Is(err, ethereum.NotFound) { - err = nil - return notFound - } - if err != nil { - err = fmt.Errorf("failed to get block %s from RPC: %w", dbBlock.BlockHash, err) - return notFound - } - - if chainBlock.BlockNumber() != dbBlock.BlockNumber { - err = fmt.Errorf("expected block numbers to match (db: %d, chain: %d), if block hashes match "+ - "(db: %s, chain: %s)", dbBlock.BlockNumber, chainBlock.BlockNumber(), dbBlock.BlockHash, chainBlock.Hash) - return notFound - } - - return found - }) - if err != nil { - return nil, fmt.Errorf("failed to find: %w", err) - } - - if !found { - return nil, fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured") - } - - lcaBlockNumber := latest.BlockNumber - int64(lcaI) - lca, err := lp.orm.SelectBlockByNumber(ctx, lcaBlockNumber) - if err != nil { - return nil, fmt.Errorf("failed to select lca from db: %w", err) - } - - if lca == nil { - return nil, fmt.Errorf("expected lca (blockNum: %d) to exist in DB", lcaBlockNumber) - } - - return lca, nil -} - func EvmWord(i uint64) common.Hash { var b = make([]byte, 8) binary.BigEndian.PutUint64(b, i) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index cb211043a4c..74ec41fa85a 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1921,119 +1921,3 @@ func markBlockAsFinalizedByHash(t *testing.T, th TestHarness, blockHash common.H require.NoError(t, err) th.Client.Blockchain().SetFinalized(b.Header()) } - -func TestFindLCA(t *testing.T) { - ctx := testutils.Context(t) - ec := evmtest.NewEthClientMockWithDefaultChain(t) - lggr := logger.Test(t) - chainID := testutils.NewRandomEVMChainID() - db := pgtest.NewSqlxDB(t) - - orm := logpoller.NewORM(chainID, db, lggr) - - lpOpts := logpoller.Opts{ - PollPeriod: time.Hour, - FinalityDepth: 2, - BackfillBatchSize: 20, - RpcBatchSize: 10, - KeepFinalizedBlocksDepth: 1000, - } - - lp := logpoller.NewLogPoller(orm, ec, lggr, lpOpts) - t.Run("Fails, if failed to select oldest block", func(t *testing.T) { - _, err := lp.FindLCA(ctx) - require.ErrorContains(t, err, "failed to select the latest block") - }) - // oldest - require.NoError(t, orm.InsertBlock(ctx, common.HexToHash("0x123"), 10, time.Now(), 0)) - // latest - latestBlockHash := common.HexToHash("0x124") - require.NoError(t, orm.InsertBlock(ctx, latestBlockHash, 16, time.Now(), 0)) - t.Run("Fails, if caller's context canceled", func(t *testing.T) { - lCtx, cancel := context.WithCancel(ctx) - ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, nil).Run(func(_ mock.Arguments) { - cancel() - }).Once() - _, err := lp.FindLCA(lCtx) - require.ErrorContains(t, err, "aborted, FindLCA request cancelled") - - }) - t.Run("Fails, if RPC returns an error", func(t *testing.T) { - expectedError := fmt.Errorf("failed to call RPC") - ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, expectedError).Once() - _, err := lp.FindLCA(ctx) - require.ErrorContains(t, err, expectedError.Error()) - }) - t.Run("Fails, if block numbers do not match", func(t *testing.T) { - ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(&evmtypes.Head{ - Number: 123, - }, nil).Once() - _, err := lp.FindLCA(ctx) - require.ErrorContains(t, err, "expected block numbers to match") - }) - t.Run("Fails, if none of the blocks in db matches on chain", func(t *testing.T) { - ec.On("HeadByHash", mock.Anything, mock.Anything).Return(nil, nil).Times(3) - _, err := lp.FindLCA(ctx) - require.ErrorContains(t, err, "failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured") - }) - - type block struct { - BN int - Exists bool - } - testCases := []struct { - Name string - Blocks []block - ExpectedBlockNumber int - ExpectedError error - }{ - { - Name: "All of the blocks are present on chain - returns the latest", - Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: true}}, - ExpectedBlockNumber: 4, - }, - { - Name: "None of the blocks exists on chain - returns an erro", - Blocks: []block{{BN: 1, Exists: false}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}}, - ExpectedBlockNumber: 0, - ExpectedError: fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured"), - }, - { - Name: "Only latest block does not exist", - Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: false}}, - ExpectedBlockNumber: 3, - }, - { - Name: "Only oldest block exists on chain", - Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}}, - ExpectedBlockNumber: 1, - }, - } - - blockHashI := int64(0) - for _, tc := range testCases { - t.Run(tc.Name, func(t *testing.T) { - // reset the database - require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 0)) - for _, b := range tc.Blocks { - blockHashI++ - hash := common.BigToHash(big.NewInt(blockHashI)) - require.NoError(t, orm.InsertBlock(ctx, hash, int64(b.BN), time.Now(), 0)) - // Hashes are unique for all test cases - var onChainBlock *evmtypes.Head - if b.Exists { - onChainBlock = &evmtypes.Head{Number: int64(b.BN)} - } - ec.On("HeadByHash", mock.Anything, hash).Return(onChainBlock, nil).Maybe() - } - - result, err := lp.FindLCA(ctx) - if tc.ExpectedError != nil { - require.ErrorContains(t, err, tc.ExpectedError.Error()) - } else { - require.NotNil(t, result) - require.Equal(t, result.BlockNumber, int64(tc.ExpectedBlockNumber), "expected block numbers to match") - } - }) - } -} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index ef3f4dbd428..548e9ca3b90 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -37,54 +37,6 @@ func (_m *LogPoller) Close() error { return r0 } -// DeleteLogsAndBlocksAfter provides a mock function with given fields: ctx, start -func (_m *LogPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { - ret := _m.Called(ctx, start) - - if len(ret) == 0 { - panic("no return value specified for DeleteLogsAndBlocksAfter") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { - r0 = rf(ctx, start) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// FindLCA provides a mock function with given fields: ctx -func (_m *LogPoller) FindLCA(ctx context.Context) (*logpoller.LogPollerBlock, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for FindLCA") - } - - var r0 *logpoller.LogPollerBlock - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (*logpoller.LogPollerBlock, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) *logpoller.LogPollerBlock); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*logpoller.LogPollerBlock) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // GetBlocksRange provides a mock function with given fields: ctx, numbers func (_m *LogPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]logpoller.LogPollerBlock, error) { ret := _m.Called(ctx, numbers) diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 8f3cdfe185e..14dec5274ad 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -151,12 +151,6 @@ func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, e }) } -func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { - return withObservedQuery(o, "SelectOldestBlock", func() (*LogPollerBlock, error) { - return o.ORM.SelectOldestBlock(ctx, minAllowedBlockNumber) - }) -} - func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) { return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 5e0a74a9183..838a38c8ebb 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -38,7 +38,6 @@ type ORM interface { SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) - SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error) @@ -203,14 +202,6 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) return &b, nil } -func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { - var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil { - return nil, err - } - return &b, nil -} - func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withConfs(confs). diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 2a1be62dd5b..8a45ff2f1c5 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1759,33 +1759,3 @@ func Benchmark_DeleteExpiredLogs(b *testing.B) { assert.NoError(b, err1) } } - -func TestSelectOldestBlock(t *testing.T) { - th := SetupTH(t, lpOpts) - o1 := th.ORM - o2 := th.ORM2 - ctx := testutils.Context(t) - t.Run("Selects oldest within given chain", func(t *testing.T) { - // insert blocks - require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 0)) - require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1232"), 12, time.Now(), 0)) - // insert newer block from different chain - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0)) - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 14, time.Now(), 0)) - block, err := o1.SelectOldestBlock(ctx, 0) - require.NoError(t, err) - require.NotNil(t, block) - require.Equal(t, block.BlockNumber, int64(13)) - require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) - }) - t.Run("Does not select blocks older than specified limit", func(t *testing.T) { - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 0)) - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0)) - require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1234"), 15, time.Now(), 0)) - block, err := o1.SelectOldestBlock(ctx, 12) - require.NoError(t, err) - require.NotNil(t, block) - require.Equal(t, block.BlockNumber, int64(13)) - require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) - }) -} diff --git a/core/cmd/blocks_commands.go b/core/cmd/blocks_commands.go index 158caf253ab..72b0523e18d 100644 --- a/core/cmd/blocks_commands.go +++ b/core/cmd/blocks_commands.go @@ -9,8 +9,6 @@ import ( "github.com/pkg/errors" "github.com/urfave/cli" "go.uber.org/multierr" - - "github.com/smartcontractkit/chainlink/v2/core/web" ) func initBlocksSubCmds(s *Shell) []cli.Command { @@ -36,18 +34,6 @@ func initBlocksSubCmds(s *Shell) []cli.Command { }, }, }, - { - Name: "find-lca", - Usage: "Find latest common block stored in DB and on chain", - Action: s.FindLCA, - Flags: []cli.Flag{ - cli.Int64Flag{ - Name: "evm-chain-id", - Usage: "Chain ID of the EVM-based blockchain", - Required: true, - }, - }, - }, } } @@ -89,47 +75,3 @@ func (s *Shell) ReplayFromBlock(c *cli.Context) (err error) { fmt.Println("Replay started") return nil } - -// LCAPresenter implements TableRenderer for an LCAResponse. -type LCAPresenter struct { - web.LCAResponse -} - -// ToRow presents the EVMChainResource as a slice of strings. -func (p *LCAPresenter) ToRow() []string { - return []string{p.EVMChainID.String(), p.Hash, strconv.FormatInt(p.BlockNumber, 10)} -} - -// RenderTable implements TableRenderer -// Just renders a single row -func (p LCAPresenter) RenderTable(rt RendererTable) error { - renderList([]string{"ChainID", "Block Hash", "Block Number"}, [][]string{p.ToRow()}, rt.Writer) - - return nil -} - -// FindLCA finds last common block stored in DB and on chain. -func (s *Shell) FindLCA(c *cli.Context) (err error) { - v := url.Values{} - - if c.IsSet("evm-chain-id") { - v.Add("evmChainID", fmt.Sprintf("%d", c.Int64("evm-chain-id"))) - } - - resp, err := s.HTTP.Get(s.ctx(), - fmt.Sprintf( - "/v2/find_lca?%s", - v.Encode(), - )) - if err != nil { - return s.errorOut(err) - } - - defer func() { - if cerr := resp.Body.Close(); cerr != nil { - err = multierr.Append(err, cerr) - } - }() - - return s.renderAPIResponse(resp, &LCAPresenter{}, "Last Common Ancestor") -} diff --git a/core/cmd/blocks_commands_test.go b/core/cmd/blocks_commands_test.go index f7656b94ae1..30540748cb1 100644 --- a/core/cmd/blocks_commands_test.go +++ b/core/cmd/blocks_commands_test.go @@ -41,28 +41,3 @@ func Test_ReplayFromBlock(t *testing.T) { c = cli.NewContext(nil, set, nil) require.NoError(t, client.ReplayFromBlock(c)) } - -func Test_FindLCA(t *testing.T) { - t.Parallel() - - //ethClient.On("BalanceAt", mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(42), nil) - app := startNewApplicationV2(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].ChainID = (*ubig.Big)(big.NewInt(5)) - c.EVM[0].Enabled = ptr(true) - }) - - client, _ := app.NewShellAndRenderer() - - set := flag.NewFlagSet("test", 0) - flagSetApplyFromAction(client.FindLCA, set, "") - - //Incorrect chain ID - require.NoError(t, set.Set("evm-chain-id", "1")) - c := cli.NewContext(nil, set, nil) - require.ErrorContains(t, client.FindLCA(c), "does not match any local chains") - - //Correct chain ID - require.NoError(t, set.Set("evm-chain-id", "5")) - c = cli.NewContext(nil, set, nil) - require.ErrorContains(t, client.FindLCA(c), "FindLCA is only available if LogPoller is enabled") -} diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 7c9c025d4be..24cb43e2090 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -34,7 +34,6 @@ import ( "github.com/jmoiron/sqlx" cutils "github.com/smartcontractkit/chainlink-common/pkg/utils" - "github.com/smartcontractkit/chainlink/v2/core/build" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" @@ -254,23 +253,6 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { }, }, }, - { - Name: "remove-blocks", - Usage: "Deletes block range and all associated data", - Action: s.RemoveBlocks, - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "start", - Usage: "Beginning of block range to be deleted", - Required: true, - }, - cli.Int64Flag{ - Name: "evm-chain-id", - Usage: "Chain ID of the EVM-based blockchain", - Required: true, - }, - }, - }, } } @@ -1198,64 +1180,3 @@ func insertFixtures(dbURL url.URL, pathToFixtures string) (err error) { _, err = db.Exec(string(fixturesSQL)) return err } - -// RemoveBlocks - removes blocks after the specified blocks number -func (s *Shell) RemoveBlocks(c *cli.Context) error { - start := c.Int64("start") - if start <= 0 { - return s.errorOut(errors.New("Must pass a positive value in '--start' parameter")) - } - - chainID := big.NewInt(0) - if c.IsSet("evm-chain-id") { - err := chainID.UnmarshalText([]byte(c.String("evm-chain-id"))) - if err != nil { - return s.errorOut(err) - } - } - - cfg := s.Config - err := cfg.Validate() - if err != nil { - return s.errorOut(fmt.Errorf("error validating configuration: %+v", err)) - } - - lggr := logger.Sugared(s.Logger.Named("RemoveBlocks")) - ldb := pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr) - ctx, cancel := context.WithCancel(context.Background()) - go shutdown.HandleShutdown(func(sig string) { - cancel() - lggr.Info("received signal to stop - closing the database and releasing lock") - - if cErr := ldb.Close(); cErr != nil { - lggr.Criticalf("Failed to close LockedDB: %v", cErr) - } - - if cErr := s.CloseLogger(); cErr != nil { - log.Printf("Failed to close Logger: %v", cErr) - } - }) - - if err = ldb.Open(ctx); err != nil { - // If not successful, we know neither locks nor connection remains opened - return s.errorOut(errors.Wrap(err, "opening db")) - } - defer lggr.ErrorIfFn(ldb.Close, "Error closing db") - - // From now on, DB locks and DB connection will be released on every return. - // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. - - app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, ldb.DB()) - if err != nil { - return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) - } - - err = app.DeleteLogPollerDataAfter(ctx, chainID, start) - if err != nil { - return s.errorOut(err) - } - - lggr.Infof("RemoveBlocks: successfully removed blocks") - - return nil -} diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index e7322e513ae..7427e6caedb 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -2,7 +2,6 @@ package cmd_test import ( "flag" - "fmt" "math/big" "os" "strconv" @@ -515,58 +514,3 @@ func TestShell_CleanupChainTables(t *testing.T) { c := cli.NewContext(nil, set, nil) require.NoError(t, client.CleanupChainTables(c)) } - -func TestShell_RemoveBlocks(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - s.Password.Keystore = models.NewSecret("dummy") - c.EVM[0].Nodes[0].Name = ptr("fake") - c.EVM[0].Nodes[0].HTTPURL = commonconfig.MustParseURL("http://fake.com") - c.EVM[0].Nodes[0].WSURL = commonconfig.MustParseURL("WSS://fake.com/ws") - // seems to be needed for config validate - c.Insecure.OCRDevelopmentMode = nil - }) - - lggr := logger.TestLogger(t) - - app := mocks.NewApplication(t) - app.On("GetSqlxDB").Maybe().Return(db) - shell := cmd.Shell{ - Config: cfg, - AppFactory: cltest.InstanceAppFactory{App: app}, - FallbackAPIInitializer: cltest.NewMockAPIInitializer(t), - Runner: cltest.EmptyRunner{}, - Logger: lggr, - } - - t.Run("Returns error, if --start is not positive", func(t *testing.T) { - set := flag.NewFlagSet("test", 0) - flagSetApplyFromAction(shell.RemoveBlocks, set, "") - require.NoError(t, set.Set("start", "0")) - require.NoError(t, set.Set("evm-chain-id", "12")) - c := cli.NewContext(nil, set, nil) - err := shell.RemoveBlocks(c) - require.ErrorContains(t, err, "Must pass a positive value in '--start' parameter") - }) - t.Run("Returns error, if removal fails", func(t *testing.T) { - set := flag.NewFlagSet("test", 0) - flagSetApplyFromAction(shell.RemoveBlocks, set, "") - require.NoError(t, set.Set("start", "10000")) - require.NoError(t, set.Set("evm-chain-id", "12")) - expectedError := fmt.Errorf("failed to delete log poller's data") - app.On("DeleteLogPollerDataAfter", mock.Anything, big.NewInt(12), int64(10000)).Return(expectedError).Once() - c := cli.NewContext(nil, set, nil) - err := shell.RemoveBlocks(c) - require.ErrorContains(t, err, expectedError.Error()) - }) - t.Run("Happy path", func(t *testing.T) { - set := flag.NewFlagSet("test", 0) - flagSetApplyFromAction(shell.RemoveBlocks, set, "") - require.NoError(t, set.Set("start", "10000")) - require.NoError(t, set.Set("evm-chain-id", "12")) - app.On("DeleteLogPollerDataAfter", mock.Anything, big.NewInt(12), int64(10000)).Return(nil).Once() - c := cli.NewContext(nil, set, nil) - err := shell.RemoveBlocks(c) - require.NoError(t, err) - }) -} diff --git a/core/internal/mocks/application.go b/core/internal/mocks/application.go index f845d46ca8d..c83b37a0e5d 100644 --- a/core/internal/mocks/application.go +++ b/core/internal/mocks/application.go @@ -23,8 +23,6 @@ import ( logger "github.com/smartcontractkit/chainlink/v2/core/logger" - logpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - mock "github.com/stretchr/testify/mock" pipeline "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -149,24 +147,6 @@ func (_m *Application) DeleteJob(ctx context.Context, jobID int32) error { return r0 } -// DeleteLogPollerDataAfter provides a mock function with given fields: ctx, chainID, start -func (_m *Application) DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error { - ret := _m.Called(ctx, chainID, start) - - if len(ret) == 0 { - panic("no return value specified for DeleteLogPollerDataAfter") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int, int64) error); ok { - r0 = rf(ctx, chainID, start) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // EVMORM provides a mock function with given fields: func (_m *Application) EVMORM() types.Configs { ret := _m.Called() @@ -187,36 +167,6 @@ func (_m *Application) EVMORM() types.Configs { return r0 } -// FindLCA provides a mock function with given fields: ctx, chainID -func (_m *Application) FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) { - ret := _m.Called(ctx, chainID) - - if len(ret) == 0 { - panic("no return value specified for FindLCA") - } - - var r0 *logpoller.LogPollerBlock - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*logpoller.LogPollerBlock, error)); ok { - return rf(ctx, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *logpoller.LogPollerBlock); ok { - r0 = rf(ctx, chainID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*logpoller.LogPollerBlock) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // GetAuditLogger provides a mock function with given fields: func (_m *Application) GetAuditLogger() audit.AuditLogger { ret := _m.Called() diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index ae3db2e7a73..2aebef3f8f7 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -22,9 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" - "github.com/smartcontractkit/chainlink/v2/core/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/bridges" @@ -117,11 +115,6 @@ type Application interface { ID() uuid.UUID SecretGenerator() SecretGenerator - - // FindLCA - finds last common ancestor for LogPoller's chain available in the database and RPC chain - FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) - // DeleteLogPollerDataAfter - delete LogPoller state starting from the specified block - DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error } // ChainlinkApplication contains fields for the JobSubscriber, Scheduler, @@ -893,39 +886,3 @@ func (app *ChainlinkApplication) GetWebAuthnConfiguration() sessions.WebAuthnCon func (app *ChainlinkApplication) ID() uuid.UUID { return app.Config.AppID() } - -// FindLCA - finds last common ancestor -func (app *ChainlinkApplication) FindLCA(ctx context.Context, chainID *big.Int) (*logpoller.LogPollerBlock, error) { - chain, err := app.GetRelayers().LegacyEVMChains().Get(chainID.String()) - if err != nil { - return nil, err - } - if !app.Config.Feature().LogPoller() { - return nil, fmt.Errorf("FindLCA is only available if LogPoller is enabled") - } - - lca, err := chain.LogPoller().FindLCA(ctx) - if err != nil { - return nil, fmt.Errorf("failed to find lca: %w", err) - } - - return lca, nil -} - -// DeleteLogPollerDataAfter - delete LogPoller state starting from the specified block -func (app *ChainlinkApplication) DeleteLogPollerDataAfter(ctx context.Context, chainID *big.Int, start int64) error { - chain, err := app.GetRelayers().LegacyEVMChains().Get(chainID.String()) - if err != nil { - return err - } - if !app.Config.Feature().LogPoller() { - return fmt.Errorf("DeleteLogPollerDataAfter is only available if LogPoller is enabled") - } - - err = chain.LogPoller().DeleteLogsAndBlocksAfter(ctx, start) - if err != nil { - return fmt.Errorf("failed to recover LogPoller: %w", err) - } - - return nil -} diff --git a/core/web/api.go b/core/web/api.go index 51f7b855cd5..1f97d59c77d 100644 --- a/core/web/api.go +++ b/core/web/api.go @@ -120,7 +120,7 @@ func ParsePaginatedResponse(input []byte, resource interface{}, links *jsonapi.L func parsePaginatedResponseToDocument(input []byte, resource interface{}, document *jsonapi.Document) error { err := ParseJSONAPIResponse(input, resource) if err != nil { - return errors.Wrapf(err, "ParseJSONAPIResponse error body: %s", string(input)) + return errors.Wrap(err, "ParseJSONAPIResponse error") } // Unmarshal using the stdlib Unmarshal to extract the links part of the document diff --git a/core/web/lca_controller.go b/core/web/lca_controller.go deleted file mode 100644 index bb4866c3d08..00000000000 --- a/core/web/lca_controller.go +++ /dev/null @@ -1,74 +0,0 @@ -package web - -import ( - "errors" - "fmt" - "net/http" - - "github.com/gin-gonic/gin" - - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" - "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" -) - -type LCAController struct { - App chainlink.Application -} - -// FindLCA compares chain of blocks available in the DB with chain provided by an RPC and returns last common ancestor -// Example: -// -// "/v2/find_lca" -func (bdc *LCAController) FindLCA(c *gin.Context) { - chain, err := getChain(bdc.App.GetRelayers().LegacyEVMChains(), c.Query("evmChainID")) - if err != nil { - if errors.Is(err, ErrInvalidChainID) || errors.Is(err, ErrMultipleChains) || errors.Is(err, ErrMissingChainID) { - jsonAPIError(c, http.StatusUnprocessableEntity, err) - return - } - jsonAPIError(c, http.StatusInternalServerError, err) - return - } - chainID := chain.ID() - - lca, err := bdc.App.FindLCA(c.Request.Context(), chainID) - if err != nil { - jsonAPIError(c, http.StatusInternalServerError, err) - return - } - - if lca == nil { - jsonAPIError(c, http.StatusNotFound, fmt.Errorf("failed to find last common ancestor")) - return - } - - response := LCAResponse{ - BlockNumber: lca.BlockNumber, - Hash: lca.BlockHash.String(), - EVMChainID: big.New(chainID), - } - jsonAPIResponse(c, &response, "response") - -} - -type LCAResponse struct { - BlockNumber int64 `json:"blockNumber"` - Hash string `json:"hash"` - EVMChainID *big.Big `json:"evmChainID"` -} - -// GetID returns the jsonapi ID. -func (s LCAResponse) GetID() string { - return "LCAResponseID" -} - -// GetName returns the collection name for jsonapi. -func (LCAResponse) GetName() string { - return "lca_response" -} - -// SetID is used to conform to the UnmarshallIdentifier interface for -// deserializing from jsonapi documents. -func (*LCAResponse) SetID(string) error { - return nil -} diff --git a/core/web/lca_controller_test.go b/core/web/lca_controller_test.go deleted file mode 100644 index 7ec476e8eca..00000000000 --- a/core/web/lca_controller_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package web_test - -import ( - _ "embed" - "io" - "net/http" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" -) - -func TestLCAController_FindLCA(t *testing.T) { - cfg := configtest.NewTestGeneralConfig(t) - ec := setupEthClientForControllerTests(t) - app := cltest.NewApplicationWithConfigAndKey(t, cfg, cltest.DefaultP2PKey, ec) - require.NoError(t, app.Start(testutils.Context(t))) - client := app.NewHTTPClient(nil) - resp, cleanup := client.Get("/v2/find_lca?evmChainID=1") - t.Cleanup(cleanup) - assert.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) - b, err := io.ReadAll(resp.Body) - require.NoError(t, err) - assert.Contains(t, string(b), "chain id does not match any local chains") -} diff --git a/core/web/router.go b/core/web/router.go index 158ea4b411f..c327583a005 100644 --- a/core/web/router.go +++ b/core/web/router.go @@ -292,8 +292,6 @@ func v2Routes(app chainlink.Application, r *gin.RouterGroup) { rc := ReplayController{app} authv2.POST("/replay_from_block/:number", auth.RequiresRunRole(rc.ReplayFromBlock)) - lcaC := LCAController{app} - authv2.GET("/find_lca", auth.RequiresRunRole(lcaC.FindLCA)) csakc := CSAKeysController{app} authv2.GET("/keys/csa", csakc.Index) diff --git a/testdata/scripts/blocks/help.txtar b/testdata/scripts/blocks/help.txtar index 5d362a082fd..55aaf71858d 100644 --- a/testdata/scripts/blocks/help.txtar +++ b/testdata/scripts/blocks/help.txtar @@ -9,8 +9,7 @@ USAGE: chainlink blocks command [command options] [arguments...] COMMANDS: - replay Replays block data from the given number - find-lca Find latest common block stored in DB and on chain + replay Replays block data from the given number OPTIONS: --help, -h show help diff --git a/testdata/scripts/help-all/help-all.txtar b/testdata/scripts/help-all/help-all.txtar index e111295abb4..eeaf0da98d1 100644 --- a/testdata/scripts/help-all/help-all.txtar +++ b/testdata/scripts/help-all/help-all.txtar @@ -16,7 +16,6 @@ admin users list # Lists all API users and their roles attempts # Commands for managing Ethereum Transaction Attempts attempts list # List the Transaction Attempts in descending order blocks # Commands for managing blocks -blocks find-lca # Find latest common block stored in DB and on chain blocks replay # Replays block data from the given number bridges # Commands for Bridges communicating with External Adapters bridges create # Create a new Bridge to an External Adapter @@ -133,7 +132,6 @@ node db status # Display the current database migration status. node db version # Display the current database version. node profile # Collects profile metrics from the node. node rebroadcast-transactions # Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue -node remove-blocks # Deletes block range and all associated data node start # Run the Chainlink node node status # Displays the health of various services running inside the node. node validate # Validate the TOML configuration and secrets that are passed as flags to the `node` command. Prints the full effective configuration, with defaults included diff --git a/testdata/scripts/node/help.txtar b/testdata/scripts/node/help.txtar index 875500b13df..33e1fdc90bc 100644 --- a/testdata/scripts/node/help.txtar +++ b/testdata/scripts/node/help.txtar @@ -13,7 +13,6 @@ COMMANDS: rebroadcast-transactions Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue validate Validate the TOML configuration and secrets that are passed as flags to the `node` command. Prints the full effective configuration, with defaults included db Commands for managing the database. - remove-blocks Deletes block range and all associated data OPTIONS: --config value, -c value TOML configuration file(s) via flag, or raw TOML via env var. If used, legacy env vars must not be set. Multiple files can be used (-c configA.toml -c configB.toml), and they are applied in order with duplicated fields overriding any earlier values. If the 'CL_CONFIG' env var is specified, it is always processed last with the effect of being the final override. [$CL_CONFIG] From c5f77ba07c11ab1cf5c46e08f8802c11b5d8fcc3 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Fri, 26 Apr 2024 22:12:12 -0700 Subject: [PATCH 8/8] Revert "Configurable Mercury transmitter parameters (#12680)" This reverts commit f55d8be495a83c97ac5439672563400e12ec2ee7. --- .changeset/sour-jars-cross.md | 13 ------ core/cmd/shell.go | 5 +-- core/config/docs/core.toml | 14 ------- core/config/mercury_config.go | 7 ---- core/config/toml/types.go | 20 +-------- core/internal/cltest/cltest.go | 3 +- core/services/chainlink/config_mercury.go | 21 ---------- core/services/chainlink/config_test.go | 8 ---- core/services/chainlink/relayer_factory.go | 9 ++-- .../testdata/config-empty-effective.toml | 4 -- .../chainlink/testdata/config-full.toml | 4 -- .../config-multi-chain-effective.toml | 4 -- core/services/ocr2/delegate.go | 1 - core/services/relay/evm/evm.go | 25 +++++------ .../services/relay/evm/mercury/transmitter.go | 27 +++++------- .../relay/evm/mercury/transmitter_test.go | 42 +++++++------------ .../evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go | 1 - .../testdata/config-empty-effective.toml | 4 -- core/web/resolver/testdata/config-full.toml | 4 -- .../config-multi-chain-effective.toml | 4 -- docs/CONFIG.md | 27 ------------ testdata/scripts/node/validate/default.txtar | 4 -- .../disk-based-logging-disabled.txtar | 4 -- .../validate/disk-based-logging-no-dir.txtar | 4 -- .../node/validate/disk-based-logging.txtar | 4 -- .../node/validate/invalid-ocr-p2p.txtar | 4 -- testdata/scripts/node/validate/invalid.txtar | 4 -- testdata/scripts/node/validate/valid.txtar | 4 -- testdata/scripts/node/validate/warnings.txtar | 4 -- 29 files changed, 45 insertions(+), 234 deletions(-) delete mode 100644 .changeset/sour-jars-cross.md diff --git a/.changeset/sour-jars-cross.md b/.changeset/sour-jars-cross.md deleted file mode 100644 index b904e8e3dd0..00000000000 --- a/.changeset/sour-jars-cross.md +++ /dev/null @@ -1,13 +0,0 @@ ---- -"chainlink": patch ---- - -#added - -Add configurability to mercury transmitter - -```toml -[Mercury.Transmitter] -TransmitQueueMaxSize = 10_000 # Default -TransmitTimeout = "5s" # Default -``` diff --git a/core/cmd/shell.go b/core/cmd/shell.go index adbb66ce63f..0372148e742 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -174,9 +174,8 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G } evmFactoryCfg := chainlink.EVMFactoryConfig{ - CSAETHKeystore: keyStore, - ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, - MercuryTransmitter: cfg.Mercury().Transmitter(), + CSAETHKeystore: keyStore, + ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, } // evm always enabled for backward compatibility // TODO BCF-2510 this needs to change in order to clear the path for EVM extraction diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 92d75430daf..605f6ced0bc 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -622,17 +622,3 @@ LatestReportDeadline = "5s" # Default [Mercury.TLS] # CertFile is the path to a PEM file of trusted root certificate authority certificates CertFile = "/path/to/client/certs.pem" # Example - -# Mercury.Transmitter controls settings for the mercury transmitter -[Mercury.Transmitter] -# TransmitQueueMaxSize controls the size of the transmit queue. This is scoped -# per OCR instance. If the queue is full, the transmitter will start dropping -# the oldest messages in order to make space. -# -# This is useful if mercury server goes offline and the nop needs to buffer -# transmissions. -TransmitQueueMaxSize = 10_000 # Default -# TransmitTimeout controls how long the transmitter will wait for a response -# when sending a message to the mercury server, before aborting and considering -# the transmission to be failed. -TransmitTimeout = "5s" # Default diff --git a/core/config/mercury_config.go b/core/config/mercury_config.go index f16fc4661a5..1210fd282ef 100644 --- a/core/config/mercury_config.go +++ b/core/config/mercury_config.go @@ -3,7 +3,6 @@ package config import ( "time" - commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/types" ) @@ -17,14 +16,8 @@ type MercuryTLS interface { CertFile() string } -type MercuryTransmitter interface { - TransmitQueueMaxSize() uint32 - TransmitTimeout() commonconfig.Duration -} - type Mercury interface { Credentials(credName string) *types.MercuryCredentials Cache() MercuryCache TLS() MercuryTLS - Transmitter() MercuryTransmitter } diff --git a/core/config/toml/types.go b/core/config/toml/types.go index ba74528b3b6..ed52c21e34e 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -1312,30 +1312,14 @@ func (m *MercuryTLS) ValidateConfig() (err error) { return } -type MercuryTransmitter struct { - TransmitQueueMaxSize *uint32 - TransmitTimeout *commonconfig.Duration -} - -func (m *MercuryTransmitter) setFrom(f *MercuryTransmitter) { - if v := f.TransmitQueueMaxSize; v != nil { - m.TransmitQueueMaxSize = v - } - if v := f.TransmitTimeout; v != nil { - m.TransmitTimeout = v - } -} - type Mercury struct { - Cache MercuryCache `toml:",omitempty"` - TLS MercuryTLS `toml:",omitempty"` - Transmitter MercuryTransmitter `toml:",omitempty"` + Cache MercuryCache `toml:",omitempty"` + TLS MercuryTLS `toml:",omitempty"` } func (m *Mercury) setFrom(f *Mercury) { m.Cache.setFrom(&f.Cache) m.TLS.setFrom(&f.TLS) - m.Transmitter.setFrom(&f.Transmitter) } func (m *Mercury) ValidateConfig() (err error) { diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 58cedbb96e1..dc7079e44d9 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -369,8 +369,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn MailMon: mailMon, DS: ds, }, - CSAETHKeystore: keyStore, - MercuryTransmitter: cfg.Mercury().Transmitter(), + CSAETHKeystore: keyStore, } if cfg.EVMEnabled() { diff --git a/core/services/chainlink/config_mercury.go b/core/services/chainlink/config_mercury.go index 1b64e0bde45..27303a68899 100644 --- a/core/services/chainlink/config_mercury.go +++ b/core/services/chainlink/config_mercury.go @@ -3,7 +3,6 @@ package chainlink import ( "time" - commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/config" @@ -26,8 +25,6 @@ func (m *mercuryCacheConfig) LatestReportDeadline() time.Duration { return m.c.LatestReportDeadline.Duration() } -var _ config.MercuryTLS = (*mercuryTLSConfig)(nil) - type mercuryTLSConfig struct { c toml.MercuryTLS } @@ -36,20 +33,6 @@ func (m *mercuryTLSConfig) CertFile() string { return *m.c.CertFile } -var _ config.MercuryTransmitter = (*mercuryTransmitterConfig)(nil) - -type mercuryTransmitterConfig struct { - c toml.MercuryTransmitter -} - -func (m *mercuryTransmitterConfig) TransmitQueueMaxSize() uint32 { - return *m.c.TransmitQueueMaxSize -} - -func (m *mercuryTransmitterConfig) TransmitTimeout() commonconfig.Duration { - return *m.c.TransmitTimeout -} - type mercuryConfig struct { c toml.Mercury s toml.MercurySecrets @@ -77,7 +60,3 @@ func (m *mercuryConfig) Cache() config.MercuryCache { func (m *mercuryConfig) TLS() config.MercuryTLS { return &mercuryTLSConfig{c: m.c.TLS} } - -func (m *mercuryConfig) Transmitter() config.MercuryTransmitter { - return &mercuryTransmitterConfig{c: m.c.Transmitter} -} diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 0d40697345d..d02948fd07b 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -710,10 +710,6 @@ func TestConfig_Marshal(t *testing.T) { TLS: toml.MercuryTLS{ CertFile: ptr("/path/to/cert.pem"), }, - Transmitter: toml.MercuryTransmitter{ - TransmitQueueMaxSize: ptr(uint32(123)), - TransmitTimeout: commoncfg.MustNewDuration(234 * time.Second), - }, } for _, tt := range []struct { @@ -1169,10 +1165,6 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '/path/to/cert.pem' - -[Mercury.Transmitter] -TransmitQueueMaxSize = 123 -TransmitTimeout = '3m54s' `}, {"full", full, fullTOML}, {"multi-chain", multiChain, multiChainTOML}, diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index 31645b7c54d..00db81cce37 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -19,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink-starknet/relayer/pkg/chainlink/config" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" - coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -39,7 +38,6 @@ type RelayerFactory struct { type EVMFactoryConfig struct { legacyevm.ChainOpts evmrelay.CSAETHKeystore - coreconfig.MercuryTransmitter } func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (map[types.RelayID]evmrelay.LoopRelayAdapter, error) { @@ -69,10 +67,9 @@ func (r *RelayerFactory) NewEVM(ctx context.Context, config EVMFactoryConfig) (m } relayerOpts := evmrelay.RelayerOpts{ - DS: ccOpts.DS, - CSAETHKeystore: config.CSAETHKeystore, - MercuryPool: r.MercuryPool, - TransmitterConfig: config.MercuryTransmitter, + DS: ccOpts.DS, + CSAETHKeystore: config.CSAETHKeystore, + MercuryPool: r.MercuryPool, } relayer, err2 := evmrelay.NewRelayer(lggr.Named(relayID.ChainID), chain, relayerOpts) if err2 != nil { diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index 38c3ed62017..759a380d15c 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -230,10 +230,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index b199ae530f5..8a016149e59 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -240,10 +240,6 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '/path/to/cert.pem' -[Mercury.Transmitter] -TransmitQueueMaxSize = 123 -TransmitTimeout = '3m54s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 13 diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 7aa3bb50b35..a6cba2aaac3 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -230,10 +230,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 4e1eb0cc623..dbde65efe40 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -198,7 +198,6 @@ type mercuryConfig interface { Credentials(credName string) *types.MercuryCredentials Cache() coreconfig.MercuryCache TLS() coreconfig.MercuryTLS - Transmitter() coreconfig.MercuryTransmitter } type thresholdConfig interface { diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 737a8e7561e..9097c217590 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -78,8 +78,7 @@ type Relayer struct { codec commontypes.Codec // Mercury - mercuryORM mercury.ORM - transmitterCfg mercury.TransmitterConfig + mercuryORM mercury.ORM // LLO/data streams cdcFactory llo.ChannelDefinitionCacheFactory @@ -94,8 +93,7 @@ type CSAETHKeystore interface { type RelayerOpts struct { DS sqlutil.DataSource CSAETHKeystore - MercuryPool wsrpc.Pool - TransmitterConfig mercury.TransmitterConfig + MercuryPool wsrpc.Pool } func (c RelayerOpts) Validate() error { @@ -124,15 +122,14 @@ func NewRelayer(lggr logger.Logger, chain legacyevm.Chain, opts RelayerOpts) (*R lloORM := llo.NewORM(opts.DS, chain.ID()) cdcFactory := llo.NewChannelDefinitionCacheFactory(lggr, lloORM, chain.LogPoller()) return &Relayer{ - ds: opts.DS, - chain: chain, - lggr: lggr, - ks: opts.CSAETHKeystore, - mercuryPool: opts.MercuryPool, - cdcFactory: cdcFactory, - lloORM: lloORM, - mercuryORM: mercuryORM, - transmitterCfg: opts.TransmitterConfig, + ds: opts.DS, + chain: chain, + lggr: lggr, + ks: opts.CSAETHKeystore, + mercuryPool: opts.MercuryPool, + cdcFactory: cdcFactory, + lloORM: lloORM, + mercuryORM: mercuryORM, }, nil } @@ -249,7 +246,7 @@ func (r *Relayer) NewMercuryProvider(rargs commontypes.RelayArgs, pargs commonty default: return nil, fmt.Errorf("invalid feed version %d", feedID.Version()) } - transmitter := mercury.NewTransmitter(lggr, r.transmitterCfg, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec) + transmitter := mercury.NewTransmitter(lggr, clients, privKey.PublicKey, rargs.JobID, *relayConfig.FeedID, r.mercuryORM, transmitterCodec) return NewMercuryProvider(cp, r.chainReader, r.codec, NewMercuryChainReader(r.chain.HeadTracker()), transmitter, reportCodecV1, reportCodecV2, reportCodecV3, lggr), nil } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 82a76450e5f..6f49ca91bfc 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -23,7 +23,6 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" @@ -34,6 +33,12 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) +var ( + maxTransmitQueueSize = 10_000 + maxDeleteQueueSize = 10_000 + transmitTimeout = 5 * time.Second +) + const ( // Mercury server error codes DuplicateReport = 2 @@ -99,15 +104,9 @@ type TransmitterReportDecoder interface { var _ Transmitter = (*mercuryTransmitter)(nil) -type TransmitterConfig interface { - TransmitQueueMaxSize() uint32 - TransmitTimeout() commonconfig.Duration -} - type mercuryTransmitter struct { services.StateMachine lggr logger.Logger - cfg TransmitterConfig servers map[string]*server @@ -143,8 +142,6 @@ func getPayloadTypes() abi.Arguments { type server struct { lggr logger.Logger - transmitTimeout time.Duration - c wsrpc.Client pm *PersistenceManager q *TransmitQueue @@ -224,7 +221,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed // queue was closed return } - ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(s.transmitTimeout)) + ctx, cancel := context.WithTimeout(runloopCtx, utils.WithJitter(transmitTimeout)) res, err := s.c.Transmit(ctx, t.Req) cancel() if runloopCtx.Err() != nil { @@ -275,19 +272,18 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed } } -func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter { +func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder) *mercuryTransmitter { feedIDHex := fmt.Sprintf("0x%x", feedID[:]) servers := make(map[string]*server, len(clients)) for serverURL, client := range clients { cLggr := lggr.Named(serverURL).With("serverURL", serverURL) - pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency) + pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, maxTransmitQueueSize, flushDeletesFrequency, pruneFrequency) servers[serverURL] = &server{ cLggr, - cfg.TransmitTimeout().Duration(), client, pm, - NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm), - make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())), + NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm), + make(chan *pb.TransmitRequest, maxDeleteQueueSize), transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), @@ -299,7 +295,6 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin return &mercuryTransmitter{ services.StateMachine{}, lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), - cfg, servers, codec, feedID, diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index b0da9bea635..46bf116ed3a 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -4,7 +4,6 @@ import ( "context" "math/big" "testing" - "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -13,7 +12,6 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -23,16 +21,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) -type mockCfg struct{} - -func (m mockCfg) TransmitQueueMaxSize() uint32 { - return 10_000 -} - -func (m mockCfg) TransmitTimeout() commonconfig.Duration { - return *commonconfig.MustNewDuration(1 * time.Hour) -} - func Test_MercuryTransmitter_Transmit(t *testing.T) { lggr := logger.TestLogger(t) db := pgtest.NewSqlxDB(t) @@ -48,7 +36,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV1Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -62,7 +50,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV2Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -76,7 +64,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { report := sampleV3Report c := &mocks.MockWSRPCClient{} clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) @@ -95,7 +83,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { clients[sURL2] = c clients[sURL3] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) // init the queue since we skipped starting transmitter mt.servers[sURL].q.Init([]*Transmission{}) mt.servers[sURL2].q.Init([]*Transmission{}) @@ -137,7 +125,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -153,7 +141,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -167,7 +155,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.LatestTimestamp(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -197,7 +185,7 @@ func Test_MercuryTransmitter_LatestTimestamp(t *testing.T) { return out, nil }, } - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) ts, err := mt.LatestTimestamp(testutils.Context(t)) require.NoError(t, err) @@ -240,7 +228,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) t.Run("BenchmarkPriceFromReport succeeds", func(t *testing.T) { codec.val = originalPrice @@ -271,7 +259,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) price, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.NoError(t, err) @@ -285,7 +273,7 @@ func Test_MercuryTransmitter_LatestPrice(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.LatestPrice(testutils.Context(t), sampleFeedID) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -315,7 +303,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -331,7 +319,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) bn, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.NoError(t, err) @@ -344,7 +332,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -362,7 +350,7 @@ func Test_MercuryTransmitter_FetchInitialMaxFinalizedBlockNumber(t *testing.T) { }, } clients[sURL] = c - mt := NewTransmitter(lggr, mockCfg{}, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) + mt := NewTransmitter(lggr, clients, sampleClientPubKey, jobID, sampleFeedID, orm, codec) _, err := mt.FetchInitialMaxFinalizedBlockNumber(testutils.Context(t)) require.Error(t, err) assert.Contains(t, err.Error(), "latestReport failed; mismatched feed IDs, expected: 0x1c916b4aa7e57ca7b68ae1bf45653f56b656fd3aa335ef7fae696b663f1b8472, got: 0x") diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go index 0c31a1d7ac9..4d05db4380f 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go @@ -11,7 +11,6 @@ import ( ) // MercuryClient is the client API for Mercury service. -// type MercuryClient interface { Transmit(ctx context.Context, in *TransmitRequest) (*TransmitResponse, error) LatestReport(ctx context.Context, in *LatestReportRequest) (*LatestReportResponse, error) diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index 38c3ed62017..759a380d15c 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -230,10 +230,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index 75fad4d2fc9..69d56974130 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -240,10 +240,6 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 123 -TransmitTimeout = '3m54s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 13 diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 7aa3bb50b35..a6cba2aaac3 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -230,10 +230,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/docs/CONFIG.md b/docs/CONFIG.md index f93d990413f..0596fcdd84d 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1698,33 +1698,6 @@ CertFile = "/path/to/client/certs.pem" # Example ``` CertFile is the path to a PEM file of trusted root certificate authority certificates -## Mercury.Transmitter -```toml -[Mercury.Transmitter] -TransmitQueueMaxSize = 10_000 # Default -TransmitTimeout = "5s" # Default -``` -Mercury.Transmitter controls settings for the mercury transmitter - -### TransmitQueueMaxSize -```toml -TransmitQueueMaxSize = 10_000 # Default -``` -TransmitQueueMaxSize controls the size of the transmit queue. This is scoped -per OCR instance. If the queue is full, the transmitter will start dropping -the oldest messages in order to make space. - -This is useful if mercury server goes offline and the nop needs to buffer -transmissions. - -### TransmitTimeout -```toml -TransmitTimeout = "5s" # Default -``` -TransmitTimeout controls how long the transmitter will wait for a response -when sending a message to the mercury server, before aborting and considering -the transmission to be failed. - ## EVM EVM defaults depend on ChainID: diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index a8e8e41750d..dd3af5f91b6 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -242,10 +242,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index feaf546f022..15a476460da 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -286,10 +286,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index b37fed41150..cc8b4577bfb 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -286,10 +286,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 6ae02ab38f4..c578d200923 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -286,10 +286,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 45c97477bd5..91ae520532d 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -271,10 +271,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index df0118bbbbf..a5e4b766b6e 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -276,10 +276,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index edb07fd5e4f..c220d7f2e5f 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -283,10 +283,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10 diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index cf121e959e1..018aaf95f4c 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -265,10 +265,6 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' -[Mercury.Transmitter] -TransmitQueueMaxSize = 10000 -TransmitTimeout = '5s' - [Capabilities] [Capabilities.Peering] IncomingMessageBufferSize = 10