Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Gateway] Better logging around node connectivity #11929

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions core/services/gateway/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var promHeartbeatsSent = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gateway_heartbeats_sent",
Help: "Metric to track the number of successful node heartbeates per DON",
var promKeepalivesSent = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gateway_keepalives_sent",
Help: "Metric to track the number of successful keepalive ping messages per DON",
}, []string{"don_id"})

// ConnectionManager holds all connections between Gateway and Nodes.
Expand Down Expand Up @@ -77,6 +77,7 @@ type donConnectionManager struct {
}

type nodeState struct {
name string
conn network.WSConnectionWrapper
}

Expand Down Expand Up @@ -107,10 +108,14 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock utils.Clock, lgg
if ok {
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
}
nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper(lggr)}
if nodes[nodeAddress].conn == nil {
connWrapper := network.NewWSConnectionWrapper(lggr)
if connWrapper == nil {
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
}
nodes[nodeAddress] = &nodeState{
name: nodeConfig.Name,
conn: connWrapper,
}
}
dons[donConfig.DonId] = &donConnectionManager{
donConfig: &donConfig,
Expand Down Expand Up @@ -148,7 +153,7 @@ func (m *connectionManager) Start(ctx context.Context) error {
go donConnMgr.readLoop(nodeAddress, nodeState)
}
donConnMgr.closeWait.Add(1)
go donConnMgr.heartbeatLoop(m.config.HeartbeatIntervalSec)
go donConnMgr.keepaliveLoop(m.config.HeartbeatIntervalSec)
}
return m.wsServer.Start(ctx)
})
Expand Down Expand Up @@ -231,7 +236,7 @@ func (m *connectionManager) FinalizeHandshake(attemptId string, response []byte,
}
if conn != nil {
conn.SetPongHandler(func(data string) error {
m.lggr.Debugw("received heartbeat pong from node", "nodeAddress", attempt.nodeAddress)
m.lggr.Debugw("received keepalive pong from node", "nodeAddress", attempt.nodeAddress)
return nil
})
}
Expand Down Expand Up @@ -299,34 +304,34 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState
}
}

func (m *donConnectionManager) heartbeatLoop(intervalSec uint32) {
func (m *donConnectionManager) keepaliveLoop(intervalSec uint32) {
ctx, _ := m.shutdownCh.NewCtx()
defer m.closeWait.Done()

if intervalSec == 0 {
m.lggr.Error("heartbeat interval is 0, heartbeat disabled")
m.lggr.Errorw("keepalive interval is 0, keepalive disabled", "donID", m.donConfig.DonId)
return
}
m.lggr.Info("starting heartbeat loop")
m.lggr.Infow("starting keepalive loop", "donID", m.donConfig.DonId)

ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop()
keepaliveTicker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer keepaliveTicker.Stop()

for {
select {
case <-m.shutdownCh:
return
case <-ticker.C:
case <-keepaliveTicker.C:
errorCount := 0
for nodeAddress, nodeState := range m.nodes {
err := nodeState.conn.Write(ctx, websocket.PingMessage, []byte{})
if err != nil {
m.lggr.Debugw("unable to send heartbeat to node", "nodeAddress", nodeAddress, "err", err)
m.lggr.Debugw("unable to send keepalive ping to node", "nodeAddress", nodeAddress, "name", nodeState.name, "donID", m.donConfig.DonId, "err", err)
errorCount++
}
}
promHeartbeatsSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount))
m.lggr.Infow("sent heartbeat to nodes", "donID", m.donConfig.DonId, "errCount", errorCount)
promKeepalivesSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount))
m.lggr.Infow("sent keepalive pings to nodes", "donID", m.donConfig.DonId, "errCount", errorCount)
}
}
}
16 changes: 16 additions & 0 deletions core/services/gateway/connectionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,19 @@ func TestConnectionManager_SendToNode_Failures(t *testing.T) {
err = donMgr.SendToNode(testutils.Context(t), "some_other_node", message)
require.Error(t, err)
}

func TestConnectionManager_CleanStartClose(t *testing.T) {
t.Parallel()

config, _ := newTestConfig(t, 2)
config.ConnectionManagerConfig.HeartbeatIntervalSec = 1
clock := utils.NewFixedClock(time.Now())
mgr, err := gateway.NewConnectionManager(config, clock, logger.TestLogger(t))
require.NoError(t, err)

err = mgr.Start(testutils.Context(t))
require.NoError(t, err)

err = mgr.Close()
require.NoError(t, err)
}
Loading