Skip to content

Commit

Permalink
[Gateway] Better logging around node connectivity
Browse files Browse the repository at this point in the history
1. Rename "heartbeat" -> "keepalive" to avoid confusion with other types of heartbeats.
2. Log node name alongside failed pings for easier debugging.
  • Loading branch information
bolekk committed Feb 6, 2024
1 parent 8b17f45 commit 3c78b78
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
37 changes: 21 additions & 16 deletions core/services/gateway/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var promHeartbeatsSent = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gateway_heartbeats_sent",
Help: "Metric to track the number of successful node heartbeates per DON",
var promKeepalivesSent = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "gateway_keepalives_sent",
Help: "Metric to track the number of successful keepalive ping messages per DON",
}, []string{"don_id"})

// ConnectionManager holds all connections between Gateway and Nodes.
Expand Down Expand Up @@ -77,6 +77,7 @@ type donConnectionManager struct {
}

type nodeState struct {
name string
conn network.WSConnectionWrapper
}

Expand Down Expand Up @@ -107,10 +108,14 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock utils.Clock, lgg
if ok {
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
}
nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper(lggr)}
if nodes[nodeAddress].conn == nil {
connWrapper := network.NewWSConnectionWrapper(lggr)
if connWrapper == nil {
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
}
nodes[nodeAddress] = &nodeState{
name: nodeConfig.Name,
conn: connWrapper,
}
}
dons[donConfig.DonId] = &donConnectionManager{
donConfig: &donConfig,
Expand Down Expand Up @@ -148,7 +153,7 @@ func (m *connectionManager) Start(ctx context.Context) error {
go donConnMgr.readLoop(nodeAddress, nodeState)
}
donConnMgr.closeWait.Add(1)
go donConnMgr.heartbeatLoop(m.config.HeartbeatIntervalSec)
go donConnMgr.keepaliveLoop(m.config.HeartbeatIntervalSec)
}
return m.wsServer.Start(ctx)
})
Expand Down Expand Up @@ -231,7 +236,7 @@ func (m *connectionManager) FinalizeHandshake(attemptId string, response []byte,
}
if conn != nil {
conn.SetPongHandler(func(data string) error {
m.lggr.Debugw("received heartbeat pong from node", "nodeAddress", attempt.nodeAddress)
m.lggr.Debugw("received keepalive pong from node", "nodeAddress", attempt.nodeAddress)
return nil
})
}
Expand Down Expand Up @@ -299,34 +304,34 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState
}
}

func (m *donConnectionManager) heartbeatLoop(intervalSec uint32) {
func (m *donConnectionManager) keepaliveLoop(intervalSec uint32) {
ctx, _ := m.shutdownCh.NewCtx()
defer m.closeWait.Done()

if intervalSec == 0 {
m.lggr.Error("heartbeat interval is 0, heartbeat disabled")
m.lggr.Errorw("keepalive interval is 0, keepalive disabled", "donID", m.donConfig.DonId)
return
}
m.lggr.Info("starting heartbeat loop")
m.lggr.Infow("starting keepalive loop", "donID", m.donConfig.DonId)

ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop()
keepaliveTicker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer keepaliveTicker.Stop()

for {
select {
case <-m.shutdownCh:
return
case <-ticker.C:
case <-keepaliveTicker.C:
errorCount := 0
for nodeAddress, nodeState := range m.nodes {
err := nodeState.conn.Write(ctx, websocket.PingMessage, []byte{})
if err != nil {
m.lggr.Debugw("unable to send heartbeat to node", "nodeAddress", nodeAddress, "err", err)
m.lggr.Debugw("unable to send keepalive ping to node", "nodeAddress", nodeAddress, "name", nodeState.name, "donID", m.donConfig.DonId, "err", err)
errorCount++
}
}
promHeartbeatsSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount))
m.lggr.Infow("sent heartbeat to nodes", "donID", m.donConfig.DonId, "errCount", errorCount)
promKeepalivesSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount))
m.lggr.Infow("sent keepalive pings to nodes", "donID", m.donConfig.DonId, "errCount", errorCount)
}
}
}
16 changes: 16 additions & 0 deletions core/services/gateway/connectionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,19 @@ func TestConnectionManager_SendToNode_Failures(t *testing.T) {
err = donMgr.SendToNode(testutils.Context(t), "some_other_node", message)
require.Error(t, err)
}

func TestConnectionManager_CleanStartClose(t *testing.T) {
t.Parallel()

config, _ := newTestConfig(t, 2)
config.ConnectionManagerConfig.HeartbeatIntervalSec = 1
clock := utils.NewFixedClock(time.Now())
mgr, err := gateway.NewConnectionManager(config, clock, logger.TestLogger(t))
require.NoError(t, err)

err = mgr.Start(testutils.Context(t))
require.NoError(t, err)

err = mgr.Close()
require.NoError(t, err)
}

0 comments on commit 3c78b78

Please sign in to comment.