Skip to content

Commit

Permalink
Merge branch 'develop' into chain-reader-event-querying2
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed May 7, 2024
2 parents 98cd914 + 700a827 commit 3a58c3d
Show file tree
Hide file tree
Showing 276 changed files with 2,769 additions and 793 deletions.
6 changes: 6 additions & 0 deletions .changeset/funny-tips-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": patch
---

#added
compare user-defined max gas price with current gas price in automation simulation pipeline
5 changes: 5 additions & 0 deletions .changeset/mighty-flies-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added ORM and corresponding tables for CCIP gas prices and token prices
6 changes: 6 additions & 0 deletions .changeset/neat-pianos-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": patch
---

#added
pass a gas estimator to registry 2.1 pipeline
5 changes: 5 additions & 0 deletions .changeset/ten-dodos-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal Normalize keystone workflow ref regex property to match id regex
5 changes: 5 additions & 0 deletions .changeset/tiny-rocks-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal generate geth wrappers for capability registry remove nodes
5 changes: 5 additions & 0 deletions .changeset/witty-weeks-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added an integration test for max gas price check
4 changes: 2 additions & 2 deletions .github/workflows/ci-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ jobs:
env:
OUTPUT_FILE: ./output.txt
USE_TEE: false
CL_DATABASE_URL: ${{ env.DB_URL }}
CL_DATABASE_URL: ${{ env.DB_URL }}
run: ./tools/bin/${{ matrix.type.cmd }} ./...
- name: Print Filtered Test Results
if: ${{ failure() && matrix.type.cmd == 'go_core_tests' && needs.filter.outputs.changes == 'true' }}
Expand Down Expand Up @@ -203,7 +203,7 @@ jobs:
./coverage.txt
./postgres_logs.txt
- name: Notify Slack
if: ${{ failure() && steps.print-races.outputs.post_to_slack == 'true' && matrix.type.cmd == 'go_core_race_tests' && (github.event_name == 'merge_group' || github.base_ref == 'develop') && needs.filter.outputs.changes == 'true' }}
if: ${{ failure() && steps.print-races.outputs.post_to_slack == 'true' && matrix.type.cmd == 'go_core_race_tests' && (github.event_name == 'merge_group' || github.event.branch == 'develop') && needs.filter.outputs.changes == 'true' }}
uses: slackapi/slack-github-action@6c661ce58804a1a20f6dc5fbee7f0381b469e001 # v1.25.0
env:
SLACK_BOT_TOKEN: ${{ secrets.QA_SLACK_API_KEY }}
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ linters:
- sqlclosecheck
- noctx
- depguard
- whitespace
linters-settings:
exhaustive:
default-signifies-exhaustive: true
Expand Down
3 changes: 0 additions & 3 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
return rpc, err
}
return n.RPC(), nil

}

// selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector.
Expand Down Expand Up @@ -641,7 +640,6 @@ loop:
// ignore critical error as it's reported in reportSendTxAnomalies
result, _ := aggregateTxResults(errorsByCode)
return result

}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) {
Expand Down Expand Up @@ -759,7 +757,6 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP

c.wg.Add(1)
go c.reportSendTxAnomalies(tx, txResultsToReport)

})
if !ok {
return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled)
Expand Down
3 changes: 0 additions & 3 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ func TestMultiNode_selectNode(t *testing.T) {
newActiveNode, err := mn.selectNode()
require.NoError(t, err)
require.Equal(t, prevActiveNode.String(), newActiveNode.String())

})
t.Run("Updates node if active is not healthy", func(t *testing.T) {
t.Parallel()
Expand All @@ -399,7 +398,6 @@ func TestMultiNode_selectNode(t *testing.T) {
newActiveNode, err := mn.selectNode()
require.NoError(t, err)
require.Equal(t, newBest.String(), newActiveNode.String())

})
t.Run("No active nodes - reports critical error", func(t *testing.T) {
t.Parallel()
Expand All @@ -418,7 +416,6 @@ func TestMultiNode_selectNode(t *testing.T) {
require.EqualError(t, err, ErroringNodeError.Error())
require.Nil(t, node)
tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available")

})
}

Expand Down
1 change: 0 additions & 1 deletion common/client/node_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func testTransition(t *testing.T, rpc *mockNodeClient[types.ID, Head], transitio
}, "Expected transition from `%s` to `%s` to panic", nodeState, destinationState)
m.AssertNotCalled(t)
assert.Equal(t, nodeState, node.State(), "Expected node to remain in initial state on invalid transition")

}
}

Expand Down
2 changes: 0 additions & 2 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
n.stateLatestFinalizedBlockNumber = latestFinalizedBN
}
}

}
}

Expand Down Expand Up @@ -524,6 +523,5 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
n.declareAlive()
return
}

}
}
9 changes: 0 additions & 9 deletions common/client/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
node.setState(nodeStateClosed)
node.wg.Add(1)
node.aliveLoop()

})
t.Run("if initial subscribe fails, transitions to unreachable", func(t *testing.T) {
t.Parallel()
Expand All @@ -58,7 +57,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
tests.AssertEventually(t, func() bool {
return node.State() == nodeStateUnreachable
})

})
t.Run("if remote RPC connection is closed transitions to unreachable", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -150,7 +148,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold)
tests.AssertLogCountEventually(t, observedLogs, "Version poll successful", 2)
assert.True(t, ensuredAlive.Load(), "expected to ensure that node was alive")

})
t.Run("with threshold poll failures, transitions to unreachable", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -356,7 +353,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, "Subscription channel unexpectedly closed")
assert.Equal(t, nodeStateUnreachable, node.State())

})
t.Run("updates block number and difficulty on new head", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -859,7 +855,6 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) {
node.setState(nodeStateClosed)
node.wg.Add(1)
node.unreachableLoop()

})
t.Run("on failed redial, keeps trying", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1017,7 +1012,6 @@ func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) {
node.setState(nodeStateClosed)
node.wg.Add(1)
node.invalidChainIDLoop()

})
t.Run("on invalid dial becomes unreachable", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1380,7 +1374,6 @@ func TestUnit_NodeLifecycle_syncStatus(t *testing.T) {
}
}
}

})
t.Run("total difficulty selection mode", func(t *testing.T) {
const syncThreshold = 10
Expand Down Expand Up @@ -1432,7 +1425,6 @@ func TestUnit_NodeLifecycle_syncStatus(t *testing.T) {
})
}
}

})
}

Expand All @@ -1453,7 +1445,6 @@ func TestUnit_NodeLifecycle_SyncingLoop(t *testing.T) {
node.setState(nodeStateClosed)
node.wg.Add(1)
node.syncingLoop()

})
t.Run("on invalid dial becomes unreachable", func(t *testing.T) {
t.Parallel()
Expand Down
8 changes: 5 additions & 3 deletions common/client/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type Poller[T any] struct {
wg sync.WaitGroup
}

// NewPoller creates a new Poller instance
// NewPoller creates a new Poller instance and returns a channel to receive the polled data
func NewPoller[
T any,
](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, channel chan<- T, logger logger.Logger) Poller[T] {
](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, logger logger.Logger) (Poller[T], <-chan T) {
channel := make(chan T)
return Poller[T]{
pollingInterval: pollingInterval,
pollingFunc: pollingFunc,
Expand All @@ -39,7 +40,7 @@ func NewPoller[
logger: logger,
errCh: make(chan error),
stopCh: make(chan struct{}),
}
}, channel
}

var _ types.Subscription = &Poller[any]{}
Expand All @@ -58,6 +59,7 @@ func (p *Poller[T]) Unsubscribe() {
close(p.stopCh)
p.wg.Wait()
close(p.errCh)
close(p.channel)
return nil
})
}
Expand Down
40 changes: 9 additions & 31 deletions common/client/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ func Test_Poller(t *testing.T) {
return nil, nil
}

channel := make(chan Head, 1)
defer close(channel)

poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr)
poller, _ := NewPoller[Head](time.Millisecond, pollFunc, time.Second, lggr)
err := poller.Start()
require.NoError(t, err)

Expand All @@ -50,12 +47,8 @@ func Test_Poller(t *testing.T) {
return h.ToMockHead(t), nil
}

// data channel to receive updates from the poller
channel := make(chan Head, 1)
defer close(channel)

// Create poller and start to receive data
poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr)
poller, channel := NewPoller[Head](time.Millisecond, pollFunc, time.Second, lggr)
require.NoError(t, poller.Start())
defer poller.Unsubscribe()

Expand All @@ -79,14 +72,10 @@ func Test_Poller(t *testing.T) {
return nil, fmt.Errorf("polling error %d", pollNumber)
}

// data channel to receive updates from the poller
channel := make(chan Head, 1)
defer close(channel)

olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)

// Create poller and subscribe to receive data
poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, olggr)
poller, _ := NewPoller[Head](time.Millisecond, pollFunc, time.Second, olggr)
require.NoError(t, poller.Start())
defer poller.Unsubscribe()

Expand Down Expand Up @@ -114,14 +103,10 @@ func Test_Poller(t *testing.T) {
// Set instant timeout
pollingTimeout := time.Duration(0)

// data channel to receive updates from the poller
channel := make(chan Head, 1)
defer close(channel)

olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)

// Create poller and subscribe to receive data
poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr)
poller, _ := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, olggr)
require.NoError(t, poller.Start())
defer poller.Unsubscribe()

Expand All @@ -146,14 +131,10 @@ func Test_Poller(t *testing.T) {
// Set long timeout
pollingTimeout := time.Minute

// data channel to receive updates from the poller
channel := make(chan Head, 1)
defer close(channel)

olggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)

// Create poller and subscribe to receive data
poller := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, channel, olggr)
poller, _ := NewPoller[Head](time.Millisecond, pollFunc, pollingTimeout, olggr)
require.NoError(t, poller.Start())

// Unsubscribe while blocked in polling function
Expand Down Expand Up @@ -184,8 +165,7 @@ func Test_Poller_Unsubscribe(t *testing.T) {
}

t.Run("Test multiple unsubscribe", func(t *testing.T) {
channel := make(chan Head, 1)
poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr)
poller, channel := NewPoller[Head](time.Millisecond, pollFunc, time.Second, lggr)
err := poller.Start()
require.NoError(t, err)

Expand All @@ -194,14 +174,12 @@ func Test_Poller_Unsubscribe(t *testing.T) {
poller.Unsubscribe()
})

t.Run("Test unsubscribe with closed channel", func(t *testing.T) {
channel := make(chan Head, 1)
poller := NewPoller[Head](time.Millisecond, pollFunc, time.Second, channel, lggr)
t.Run("Read channel after unsubscribe", func(t *testing.T) {
poller, channel := NewPoller[Head](time.Millisecond, pollFunc, time.Second, lggr)
err := poller.Start()
require.NoError(t, err)

<-channel
close(channel)
poller.Unsubscribe()
require.Equal(t, <-channel, nil)
})
}
1 change: 0 additions & 1 deletion common/internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func NewRedialBackoff() backoff.Backoff {
Max: 15 * time.Second,
Jitter: true,
}

}

// MinFunc returns the minimum value of the given element array with respect
Expand Down
1 change: 0 additions & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// trying to send the transaction over again.
return fmt.Errorf("retryable error while sending transaction %s (tx ID %d): %w", attempt.Hash.String(), etx.ID, err), true
}

}

// Finds next transaction in the queue, assigns a sequence, and moves it to "in_progress" state ready for broadcast.
Expand Down
3 changes: 0 additions & 3 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) bum

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleInProgressAttempt(ctx context.Context, lggr logger.SugaredLogger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], blockHeight int64) error {
if attempt.State != txmgrtypes.TxAttemptInProgress {

return fmt.Errorf("invariant violation: expected tx_attempt %v to be in_progress, it was %s", attempt.ID, attempt.State)
}

Expand Down Expand Up @@ -1049,7 +1048,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) For
ec.lggr.Infof("ForceRebroadcast: will rebroadcast transactions for all sequences between %v and %v", seqs[0], seqs[len(seqs)-1])

for _, seq := range seqs {

etx, err := ec.txStore.FindTxWithSequence(ctx, address, seq)
if err != nil {
return fmt.Errorf("ForceRebroadcast failed: %w", err)
Expand Down Expand Up @@ -1098,7 +1096,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {

receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)

if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions contracts/.changeset/stupid-horses-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@chainlink/contracts": patch
---

implement remove nodes on capability registry
Loading

0 comments on commit 3a58c3d

Please sign in to comment.