Skip to content

Commit

Permalink
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
Browse files Browse the repository at this point in the history
…action-feature-in-TXM
  • Loading branch information
amit-momin authored May 23, 2024
2 parents d81da52 + c7a6356 commit a33460b
Show file tree
Hide file tree
Showing 154 changed files with 2,325 additions and 1,156 deletions.
5 changes: 5 additions & 0 deletions .changeset/blue-camels-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

enforce proper result indexing on pipeline results #breaking_change
5 changes: 5 additions & 0 deletions .changeset/blue-camels-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#changed Remove ClientErrors interface from common
5 changes: 5 additions & 0 deletions .changeset/silent-jars-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal [Keystone] EVM encoder support for tuples
5 changes: 5 additions & 0 deletions .changeset/tame-mice-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#db_update Add ON DELETE CASCADE to workflow tables
5 changes: 5 additions & 0 deletions .changeset/wild-berries-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#db_update Add name to workflow spec. Add unique constraint to (owner,name) for workflow spec
5 changes: 5 additions & 0 deletions .changeset/young-candles-brush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix allow ChainType to be set to xdai
6 changes: 5 additions & 1 deletion .github/workflows/automation-nightly-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ jobs:
cl_image_tag: 'latest'
aws_registries: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }}
artifacts_location: ./integration-tests/${{ matrix.tests.suite }}/logs
artifacts_name: testcontainers-logs-${{ matrix.tests.name }}
publish_check_name: Automation Results ${{ matrix.tests.name }}
token: ${{ secrets.GITHUB_TOKEN }}
go_mod_path: ./integration-tests/go.mod
Expand All @@ -139,7 +140,7 @@ jobs:
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
if: failure()
with:
name: test-log-${{ matrix.tests.name }}
name: gotest-logs-${{ matrix.tests.name }}
path: /tmp/gotest.log
retention-days: 7
continue-on-error: true
Expand All @@ -155,6 +156,9 @@ jobs:
this-job-name: Automation ${{ matrix.tests.name }} Test
test-results-file: '{"testType":"go","filePath":"/tmp/gotest.log"}'
continue-on-error: true
- name: Print failed test summary
if: always()
uses: smartcontractkit/chainlink-github-actions/chainlink-testing-framework/show-test-summary@b49a9d04744b0237908831730f8553f26d73a94b # v2.3.17

test-notify:
name: Start Slack Thread
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ jobs:
QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }}
QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }}
QA_KUBECONFIG: ${{ secrets.QA_KUBECONFIG }}
- name: Pull Artfacts
- name: Pull Artifacts
if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch'
run: |
IMAGE_NAME=${{ secrets.QA_AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ secrets.QA_AWS_REGION }}.amazonaws.com/chainlink-solana-tests:${{ needs.get_solana_sha.outputs.sha }}
Expand All @@ -1232,12 +1232,20 @@ jobs:
docker rm "$CONTAINER_ID"
- name: Install Solana CLI # required for ensuring the local test validator is configured correctly
run: ./scripts/install-solana-ci.sh
- name: Install gauntlet
run: |
yarn --cwd ./gauntlet install --frozen-lockfile
yarn --cwd ./gauntlet build
yarn --cwd ./gauntlet gauntlet
- name: Generate config overrides
run: | # https://github.com/smartcontractkit/chainlink-testing-framework/blob/main/config/README.md
cat << EOF > config.toml
[ChainlinkImage]
image="${{ env.CHAINLINK_IMAGE }}"
version="${{ inputs.evm-ref || github.sha }}"
[Common]
user="${{ github.actor }}"
internal_docker_repo = "${{ secrets.QA_AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ secrets.QA_AWS_REGION }}.amazonaws.com"
EOF
# shellcheck disable=SC2002
BASE64_CONFIG_OVERRIDE=$(cat config.toml | base64 -w 0)
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ linters:
- noctx
- depguard
- whitespace
- containedctx
linters-settings:
exhaustive:
default-signifies-exhaustive: true
Expand Down
12 changes: 3 additions & 9 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -47,7 +45,6 @@ type NodeConfig interface {
SyncThreshold() uint32
NodeIsSyncingEnabled() bool
FinalizedBlockPollInterval() time.Duration
Errors() config.ClientErrors
}

type ChainConfig interface {
Expand Down Expand Up @@ -106,10 +103,7 @@ type node[
stateLatestTotalDifficulty *big.Int
stateLatestFinalizedBlockNumber int64

// nodeCtx is the node lifetime's context
nodeCtx context.Context
// cancelNodeCtx cancels nodeCtx when stopping the node
cancelNodeCtx context.CancelFunc
stopCh services.StopChan
// wg waits for subsidiary goroutines
wg sync.WaitGroup

Expand Down Expand Up @@ -148,7 +142,7 @@ func NewNode[
if httpuri != nil {
n.http = httpuri
}
n.nodeCtx, n.cancelNodeCtx = context.WithCancel(context.Background())
n.stopCh = make(services.StopChan)
lggr = logger.Named(lggr, "Node")
lggr = logger.With(lggr,
"nodeTier", Primary.String(),
Expand Down Expand Up @@ -205,7 +199,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) close() error {
n.stateMu.Lock()
defer n.stateMu.Unlock()

n.cancelNodeCtx()
close(n.stopCh)
n.state = nodeStateClosed
return nil
}
Expand Down
55 changes: 34 additions & 21 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const rpcSubscriptionMethodNewHeads = "newHeads"
// Should only be run ONCE per node, after a successful Dial
func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -100,7 +102,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Tracew("Alive loop starting", "nodeState", n.State())

headsC := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, headsC, rpcSubscriptionMethodNewHeads)
sub, err := n.rpc.Subscribe(ctx, headsC, rpcSubscriptionMethodNewHeads)
if err != nil {
lggr.Errorw("Initial subscribe for heads failed", "nodeState", n.State())
n.declareUnreachable()
Expand Down Expand Up @@ -151,15 +153,16 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-pollCh:
var version string
promPoolRPCNodePolls.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Polling for version", "nodeState", n.State(), "pollFailures", pollFailures)
ctx, cancel := context.WithTimeout(n.nodeCtx, pollInterval)
version, err := n.RPC().ClientVersion(ctx)
cancel()
version, err := func(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, pollInterval)
defer cancel()
return n.RPC().ClientVersion(ctx)
}(ctx)
if err != nil {
// prevent overflow
if pollFailures < math.MaxUint32 {
Expand Down Expand Up @@ -240,9 +243,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
n.declareOutOfSync(func(num int64, td *big.Int) bool { return num < highestReceivedBlockNumber })
return
case <-pollFinalizedHeadCh:
ctx, cancel := context.WithTimeout(n.nodeCtx, n.nodePoolCfg.FinalizedBlockPollInterval())
latestFinalized, err := n.RPC().LatestFinalizedBlock(ctx)
cancel()
latestFinalized, err := func(ctx context.Context) (HEAD, error) {
ctx, cancel := context.WithTimeout(ctx, n.nodePoolCfg.FinalizedBlockPollInterval())
defer cancel()
return n.RPC().LatestFinalizedBlock(ctx)
}(ctx)
if err != nil {
lggr.Warnw("Failed to fetch latest finalized block", "err", err)
continue
Expand Down Expand Up @@ -300,6 +305,8 @@ const (
// outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status
func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td *big.Int) bool) {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -319,7 +326,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
lggr.Debugw("Trying to revive out-of-sync RPC node", "nodeState", n.State())

// Need to redial since out-of-sync nodes are automatically disconnected
state := n.createVerifiedConn(n.nodeCtx, lggr)
state := n.createVerifiedConn(ctx, lggr)
if state != nodeStateAlive {
n.declareState(state)
return
Expand All @@ -328,7 +335,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
lggr.Tracew("Successfully subscribed to heads feed on out-of-sync RPC node", "nodeState", n.State())

ch := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, ch, rpcSubscriptionMethodNewHeads)
sub, err := n.rpc.Subscribe(ctx, ch, rpcSubscriptionMethodNewHeads)
if err != nil {
lggr.Errorw("Failed to subscribe heads on out-of-sync RPC node", "nodeState", n.State(), "err", err)
n.declareUnreachable()
Expand All @@ -338,7 +345,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case head, open := <-ch:
if !open {
Expand Down Expand Up @@ -372,6 +379,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td

func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -394,20 +403,20 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-time.After(dialRetryBackoff.Duration()):
lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.State())

err := n.rpc.Dial(n.nodeCtx)
err := n.rpc.Dial(ctx)
if err != nil {
lggr.Errorw(fmt.Sprintf("Failed to redial RPC node; still unreachable: %v", err), "err", err, "nodeState", n.State())
continue
}

n.setState(nodeStateDialed)

state := n.verifyConn(n.nodeCtx, lggr)
state := n.verifyConn(ctx, lggr)
switch state {
case nodeStateUnreachable:
n.setState(nodeStateUnreachable)
Expand All @@ -425,6 +434,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {

func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -443,7 +454,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {
lggr := logger.Named(n.lfcLog, "InvalidChainID")

// Need to redial since invalid chain ID nodes are automatically disconnected
state := n.createVerifiedConn(n.nodeCtx, lggr)
state := n.createVerifiedConn(ctx, lggr)
if state != nodeStateInvalidChainID {
n.declareState(state)
return
Expand All @@ -455,10 +466,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-time.After(chainIDRecheckBackoff.Duration()):
state := n.verifyConn(n.nodeCtx, lggr)
state := n.verifyConn(ctx, lggr)
switch state {
case nodeStateInvalidChainID:
continue
Expand All @@ -475,6 +486,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {

func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
defer n.wg.Done()
ctx, cancel := n.stopCh.NewCtx()
defer cancel()

{
// sanity check
Expand All @@ -493,7 +506,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
lggr := logger.Sugared(logger.Named(n.lfcLog, "Syncing"))
lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with syncing status", n.String()), "nodeState", n.State())
// Need to redial since syncing nodes are automatically disconnected
state := n.createVerifiedConn(n.nodeCtx, lggr)
state := n.createVerifiedConn(ctx, lggr)
if state != nodeStateSyncing {
n.declareState(state)
return
Expand All @@ -503,11 +516,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {

for {
select {
case <-n.nodeCtx.Done():
case <-ctx.Done():
return
case <-time.After(recheckBackoff.Duration()):
lggr.Tracew("Trying to recheck if the node is still syncing", "nodeState", n.State())
isSyncing, err := n.rpc.IsSyncing(n.nodeCtx)
isSyncing, err := n.rpc.IsSyncing(ctx)
if err != nil {
lggr.Errorw("Unexpected error while verifying RPC node synchronization status", "err", err, "nodeState", n.State())
n.declareUnreachable()
Expand Down
6 changes: 0 additions & 6 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

clientMocks "github.com/smartcontractkit/chainlink/v2/common/client/mocks"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
)

type testNodeConfig struct {
Expand All @@ -19,7 +18,6 @@ type testNodeConfig struct {
syncThreshold uint32
nodeIsSyncingEnabled bool
finalizedBlockPollInterval time.Duration
errors config.ClientErrors
}

func (n testNodeConfig) PollFailureThreshold() uint32 {
Expand All @@ -46,10 +44,6 @@ func (n testNodeConfig) FinalizedBlockPollInterval() time.Duration {
return n.finalizedBlockPollInterval
}

func (n testNodeConfig) Errors() config.ClientErrors {
return n.errors
}

type testNode struct {
*node[types.ID, Head, NodeClient[types.ID, Head]]
}
Expand Down
Loading

0 comments on commit a33460b

Please sign in to comment.