Skip to content

Commit

Permalink
Merge branch 'develop' into utils_extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Nov 28, 2023
2 parents d5b3da0 + bd7e233 commit 2ed9da2
Show file tree
Hide file tree
Showing 178 changed files with 2,380 additions and 1,209 deletions.
37 changes: 19 additions & 18 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/config"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var (
Expand Down Expand Up @@ -87,7 +87,7 @@ type multiNode[
sendonlys []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
chainID CHAIN_ID
chainType config.ChainType
logger logger.Logger
lggr logger.Logger
selectionMode string
noNewHeadsThreshold time.Duration
nodeSelector NodeSelector[CHAIN_ID, HEAD, RPC_CLIENT]
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewMultiNode[
HEAD types.Head[BLOCK_HASH],
RPC_CLIENT RPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD],
](
logger logger.Logger,
l logger.Logger,
selectionMode string,
leaseDuration time.Duration,
noNewHeadsThreshold time.Duration,
Expand All @@ -132,7 +132,8 @@ func NewMultiNode[
) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] {
nodeSelector := newNodeSelector(selectionMode, nodes)

lggr := logger.Named("MultiNode").With("chainID", chainID.String())
lggr := logger.Named(l, "MultiNode")
lggr = logger.With(lggr, "chainID", chainID.String())

// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
Expand All @@ -142,7 +143,7 @@ func NewMultiNode[
sendonlys: sendonlys,
chainID: chainID,
chainType: chainType,
logger: lggr,
lggr: lggr,
selectionMode: selectionMode,
noNewHeadsThreshold: noNewHeadsThreshold,
nodeSelector: nodeSelector,
Expand All @@ -153,7 +154,7 @@ func NewMultiNode[
reportInterval: reportInterval,
}

c.logger.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode)
c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode)

return c
}
Expand Down Expand Up @@ -197,11 +198,11 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
go c.runLoop()

if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin {
c.logger.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String())
c.lggr.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String())
c.wg.Add(1)
go c.checkLeaseLoop()
} else {
c.logger.Info("Best node switching is disabled")
c.lggr.Info("Best node switching is disabled")
}

return nil
Expand Down Expand Up @@ -249,7 +250,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
c.activeNode = c.nodeSelector.Select()

if c.activeNode == nil {
c.logger.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
logger.Criticalw(c.lggr, "No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name())
errmsg := fmt.Errorf("no live nodes available for chain %s", c.chainID.String())
c.SvcErrBuffer.Append(errmsg)
err = ErroringNodeError
Expand Down Expand Up @@ -282,7 +283,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
// Terminate client subscriptions. Services are responsible for reconnecting, which will be routed to the new
// best node. Only terminate connections with more than 1 subscription to account for the aliveLoop subscription
if n.State() == nodeStateAlive && n != bestNode && n.SubscribersCount() > 1 {
c.logger.Infof("Switching to best node from %q to %q", n.String(), bestNode.String())
c.lggr.Infof("Switching to best node from %q to %q", n.String(), bestNode.String())
n.UnsubscribeAllExceptAliveLoop()
}
}
Expand Down Expand Up @@ -351,13 +352,13 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}

live := total - dead
c.logger.Tracew(fmt.Sprintf("MultiNode state: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
logger.Tracew(c.lggr, fmt.Sprintf("MultiNode state: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
if total == dead {
rerr := fmt.Errorf("no primary nodes available: 0/%d nodes are alive", total)
c.logger.Criticalw(rerr.Error(), "nodeStates", nodeStates)
logger.Criticalw(c.lggr, rerr.Error(), "nodeStates", nodeStates)
c.SvcErrBuffer.Append(rerr)
} else if dead > 0 {
c.logger.Errorw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
c.lggr.Errorw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodeStates)
}
}

Expand Down Expand Up @@ -403,9 +404,9 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
defer wg.Done()
err := n.RPC().BatchCallContext(ctx, b)
if err != nil {
c.logger.Debugw("Secondary node BatchCallContext failed", "err", err)
c.lggr.Debugw("Secondary node BatchCallContext failed", "err", err)
} else {
c.logger.Trace("Secondary node BatchCallContext success")
logger.Trace(c.lggr, "Secondary node BatchCallContext success")
}
}(n)
}
Expand Down Expand Up @@ -572,15 +573,15 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
defer c.wg.Done()

txErr := n.RPC().SendTransaction(ctx, tx)
c.logger.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
c.lggr.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
sendOnlyError := c.sendOnlyErrorParser(txErr)
if sendOnlyError != Successful {
c.logger.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}
}(n)
})
if !ok {
c.logger.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String())
c.lggr.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String())
}
}
if nodeError != nil {
Expand Down
20 changes: 10 additions & 10 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type multiNodeRPCClient RPC[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
Expand All @@ -43,7 +43,7 @@ type multiNodeOpts struct {

func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode {
if opts.logger == nil {
opts.logger = logger.TestLogger(t)
opts.logger = logger.Test(t)
}

result := NewMultiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestMultiNode_Report(t *testing.T) {
chainID := types.RandomID()
node1 := newHealthyNode(t, chainID)
node2 := newNodeWithState(t, chainID, nodeStateOutOfSync)
lggr, observedLogs := logger.TestLoggerObserved(t, zap.WarnLevel)
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
Expand All @@ -228,7 +228,7 @@ func TestMultiNode_Report(t *testing.T) {
t.Parallel()
chainID := types.RandomID()
node := newNodeWithState(t, chainID, nodeStateOutOfSync)
lggr, observedLogs := logger.TestLoggerObserved(t, zap.WarnLevel)
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
Expand All @@ -252,7 +252,7 @@ func TestMultiNode_CheckLease(t *testing.T) {
t.Parallel()
chainID := types.RandomID()
node := newHealthyNode(t, chainID)
lggr, observedLogs := logger.TestLoggerObserved(t, zap.InfoLevel)
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
Expand All @@ -268,7 +268,7 @@ func TestMultiNode_CheckLease(t *testing.T) {
t.Parallel()
chainID := types.RandomID()
node := newHealthyNode(t, chainID)
lggr, observedLogs := logger.TestLoggerObserved(t, zap.InfoLevel)
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeHighestHead,
chainID: chainID,
Expand All @@ -290,7 +290,7 @@ func TestMultiNode_CheckLease(t *testing.T) {
bestNode := newHealthyNode(t, chainID)
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(bestNode)
lggr, observedLogs := logger.TestLoggerObserved(t, zap.InfoLevel)
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeHighestHead,
chainID: chainID,
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestMultiNode_selectNode(t *testing.T) {
t.Run("No active nodes - reports critical error", func(t *testing.T) {
t.Parallel()
chainID := types.RandomID()
lggr, observedLogs := logger.TestLoggerObserved(t, zap.InfoLevel)
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) {
mainNode.On("RPC").Return(okRPC)
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(mainNode).Once()
lggr, observedLogs := logger.TestLoggerObserved(t, zap.DebugLevel)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestMultiNode_SendTransaction(t *testing.T) {
mainNode.On("RPC").Return(okRPC)
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(mainNode).Once()
lggr, observedLogs := logger.TestLoggerObserved(t, zap.DebugLevel)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
Expand Down
7 changes: 4 additions & 3 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"

"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const QueryTimeout = 10 * time.Second
Expand Down Expand Up @@ -135,14 +135,15 @@ func NewNode[
n.http = httpuri
}
n.nodeCtx, n.cancelNodeCtx = context.WithCancel(context.Background())
lggr = lggr.Named("Node").With(
lggr = logger.Named(lggr, "Node")
lggr = logger.With(lggr,
"nodeTier", Primary.String(),
"nodeName", name,
"node", n.String(),
"chainID", chainID,
"nodeOrder", n.order,
)
n.lfcLog = lggr.Named("Lifecycle")
n.lfcLog = logger.Named(lggr, "Lifecycle")
n.stateLatestBlockNumber = -1
n.rpc = rpc
n.chainFamily = chainFamily
Expand Down
33 changes: 18 additions & 15 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -90,8 +92,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold()
pollInterval := n.nodePoolCfg.PollInterval()

lggr := n.lfcLog.Named("Alive").With("noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold)
lggr.Tracew("Alive loop starting", "nodeState", n.State())
lggr := logger.Named(n.lfcLog, "Alive")
lggr = logger.With(lggr, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold, "pollInterval", pollInterval, "pollFailureThreshold", pollFailureThreshold)
logger.Tracew(lggr, "Alive loop starting", "nodeState", n.State())

headsC := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, headsC, rpcSubscriptionMethodNewHeads)
Expand Down Expand Up @@ -142,7 +145,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
case <-pollCh:
var version string
promPoolRPCNodePolls.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Polling for version", "nodeState", n.State(), "pollFailures", pollFailures)
logger.Tracew(lggr, "Polling for version", "nodeState", n.State(), "pollFailures", pollFailures)
ctx, cancel := context.WithTimeout(n.nodeCtx, pollInterval)
version, err := n.RPC().ClientVersion(ctx)
cancel()
Expand All @@ -162,7 +165,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.State())
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 2 {
lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
logger.Criticalf(lggr, "RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
continue
}
}
Expand All @@ -174,7 +177,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
// note: there must be another live node for us to be out of sync
lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", num, "totalDifficulty", td, "nodeState", n.State())
if liveNodes < 2 {
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
logger.Criticalf(lggr, "RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
}
n.declareOutOfSync(n.isOutOfSync)
Expand All @@ -187,13 +190,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
return
}
promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Got head", "head", bh)
logger.Tracew(lggr, "Got head", "head", bh)
if bh.BlockNumber() > highestReceivedBlockNumber {
promPoolRPCNodeHighestSeenBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(bh.BlockNumber()))
lggr.Tracew("Got higher block number, resetting timer", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
logger.Tracew(lggr, "Got higher block number, resetting timer", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
highestReceivedBlockNumber = bh.BlockNumber()
} else {
lggr.Tracew("Ignoring previously seen block number", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
logger.Tracew(lggr, "Ignoring previously seen block number", "latestReceivedBlockNumber", highestReceivedBlockNumber, "blockNumber", bh.BlockNumber(), "nodeState", n.State())
}
if outOfSyncT != nil {
outOfSyncT.Reset(noNewHeadsTimeoutThreshold)
Expand All @@ -209,7 +212,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, highestReceivedBlockNumber), "nodeState", n.State(), "latestReceivedBlockNumber", highestReceivedBlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold)
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 2 {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
logger.Criticalf(lggr, "RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
outOfSyncT.Reset(zombieNodeCheckInterval(n.noNewHeadsThreshold))
Expand Down Expand Up @@ -275,7 +278,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td

outOfSyncAt := time.Now()

lggr := n.lfcLog.Named("OutOfSync")
lggr := logger.Named(n.lfcLog, "OutOfSync")
lggr.Debugw("Trying to revive out-of-sync RPC node", "nodeState", n.State())

// Need to redial since out-of-sync nodes are automatically disconnected
Expand All @@ -292,7 +295,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
return
}

lggr.Tracew("Successfully subscribed to heads feed on out-of-sync RPC node", "nodeState", n.State())
logger.Tracew(lggr, "Successfully subscribed to heads feed on out-of-sync RPC node", "nodeState", n.State())

ch := make(chan HEAD)
sub, err := n.rpc.Subscribe(n.nodeCtx, ch, rpcSubscriptionMethodNewHeads)
Expand Down Expand Up @@ -324,7 +327,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
case <-time.After(zombieNodeCheckInterval(n.noNewHeadsThreshold)):
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 1 {
lggr.Critical("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
logger.Critical(lggr, "RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
n.declareInSync()
return
}
Expand Down Expand Up @@ -354,7 +357,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {

unreachableAt := time.Now()

lggr := n.lfcLog.Named("Unreachable")
lggr := logger.Named(n.lfcLog, "Unreachable")
lggr.Debugw("Trying to revive unreachable RPC node", "nodeState", n.State())

dialRetryBackoff := utils.NewRedialBackoff()
Expand All @@ -364,7 +367,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
case <-n.nodeCtx.Done():
return
case <-time.After(dialRetryBackoff.Duration()):
lggr.Tracew("Trying to re-dial RPC node", "nodeState", n.State())
logger.Tracew(lggr, "Trying to re-dial RPC node", "nodeState", n.State())

err := n.rpc.Dial(n.nodeCtx)
if err != nil {
Expand Down Expand Up @@ -410,7 +413,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) invalidChainIDLoop() {

invalidAt := time.Now()

lggr := n.lfcLog.Named("InvalidChainID")
lggr := logger.Named(n.lfcLog, "InvalidChainID")
lggr.Debugw(fmt.Sprintf("Periodically re-checking RPC node %s with invalid chain ID", n.String()), "nodeState", n.State())

chainIDRecheckBackoff := utils.NewRedialBackoff()
Expand Down
Loading

0 comments on commit 2ed9da2

Please sign in to comment.