diff --git a/core/services/gateway/connectionmanager.go b/core/services/gateway/connectionmanager.go index e5f7fb13afb..a3c39211c6e 100644 --- a/core/services/gateway/connectionmanager.go +++ b/core/services/gateway/connectionmanager.go @@ -27,9 +27,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -var promHeartbeatsSent = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "gateway_heartbeats_sent", - Help: "Metric to track the number of successful node heartbeates per DON", +var promKeepalivesSent = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "gateway_keepalives_sent", + Help: "Metric to track the number of successful keepalive ping messages per DON", }, []string{"don_id"}) // ConnectionManager holds all connections between Gateway and Nodes. @@ -77,6 +77,7 @@ type donConnectionManager struct { } type nodeState struct { + name string conn network.WSConnectionWrapper } @@ -107,10 +108,14 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock utils.Clock, lgg if ok { return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId) } - nodes[nodeAddress] = &nodeState{conn: network.NewWSConnectionWrapper(lggr)} - if nodes[nodeAddress].conn == nil { + connWrapper := network.NewWSConnectionWrapper(lggr) + if connWrapper == nil { return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress) } + nodes[nodeAddress] = &nodeState{ + name: nodeConfig.Name, + conn: connWrapper, + } } dons[donConfig.DonId] = &donConnectionManager{ donConfig: &donConfig, @@ -148,7 +153,7 @@ func (m *connectionManager) Start(ctx context.Context) error { go donConnMgr.readLoop(nodeAddress, nodeState) } donConnMgr.closeWait.Add(1) - go donConnMgr.heartbeatLoop(m.config.HeartbeatIntervalSec) + go donConnMgr.keepaliveLoop(m.config.HeartbeatIntervalSec) } return m.wsServer.Start(ctx) }) @@ -231,7 +236,7 @@ func (m *connectionManager) FinalizeHandshake(attemptId string, response []byte, } if conn != nil { conn.SetPongHandler(func(data string) error { - m.lggr.Debugw("received heartbeat pong from node", "nodeAddress", attempt.nodeAddress) + m.lggr.Debugw("received keepalive pong from node", "nodeAddress", attempt.nodeAddress) return nil }) } @@ -299,34 +304,34 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState } } -func (m *donConnectionManager) heartbeatLoop(intervalSec uint32) { +func (m *donConnectionManager) keepaliveLoop(intervalSec uint32) { ctx, _ := m.shutdownCh.NewCtx() defer m.closeWait.Done() if intervalSec == 0 { - m.lggr.Error("heartbeat interval is 0, heartbeat disabled") + m.lggr.Errorw("keepalive interval is 0, keepalive disabled", "donID", m.donConfig.DonId) return } - m.lggr.Info("starting heartbeat loop") + m.lggr.Infow("starting keepalive loop", "donID", m.donConfig.DonId) - ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) - defer ticker.Stop() + keepaliveTicker := time.NewTicker(time.Duration(intervalSec) * time.Second) + defer keepaliveTicker.Stop() for { select { case <-m.shutdownCh: return - case <-ticker.C: + case <-keepaliveTicker.C: errorCount := 0 for nodeAddress, nodeState := range m.nodes { err := nodeState.conn.Write(ctx, websocket.PingMessage, []byte{}) if err != nil { - m.lggr.Debugw("unable to send heartbeat to node", "nodeAddress", nodeAddress, "err", err) + m.lggr.Debugw("unable to send keepalive ping to node", "nodeAddress", nodeAddress, "name", nodeState.name, "donID", m.donConfig.DonId, "err", err) errorCount++ } } - promHeartbeatsSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount)) - m.lggr.Infow("sent heartbeat to nodes", "donID", m.donConfig.DonId, "errCount", errorCount) + promKeepalivesSent.WithLabelValues(m.donConfig.DonId).Set(float64(len(m.nodes) - errorCount)) + m.lggr.Infow("sent keepalive pings to nodes", "donID", m.donConfig.DonId, "errCount", errorCount) } } } diff --git a/core/services/gateway/connectionmanager_test.go b/core/services/gateway/connectionmanager_test.go index d198ef67295..b176837d9ca 100644 --- a/core/services/gateway/connectionmanager_test.go +++ b/core/services/gateway/connectionmanager_test.go @@ -227,3 +227,19 @@ func TestConnectionManager_SendToNode_Failures(t *testing.T) { err = donMgr.SendToNode(testutils.Context(t), "some_other_node", message) require.Error(t, err) } + +func TestConnectionManager_CleanStartClose(t *testing.T) { + t.Parallel() + + config, _ := newTestConfig(t, 2) + config.ConnectionManagerConfig.HeartbeatIntervalSec = 1 + clock := utils.NewFixedClock(time.Now()) + mgr, err := gateway.NewConnectionManager(config, clock, logger.TestLogger(t)) + require.NoError(t, err) + + err = mgr.Start(testutils.Context(t)) + require.NoError(t, err) + + err = mgr.Close() + require.NoError(t, err) +}