diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 498d355d7f7..a4167eccdf0 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -47,7 +47,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" - "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/core/services/vrf" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" @@ -219,24 +218,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { healthChecker := services.NewChecker() - telemetryIngressClient := synchronization.TelemetryIngressClient(&synchronization.NoopTelemetryIngressClient{}) - telemetryIngressBatchClient := synchronization.TelemetryIngressBatchClient(&synchronization.NoopTelemetryIngressBatchClient{}) - monitoringEndpointGen := telemetry.MonitoringEndpointGenerator(&telemetry.NoopAgent{}) - - ticfg := cfg.TelemetryIngress() - if ticfg.URL() != nil { - if ticfg.UseBatchSend() { - telemetryIngressBatchClient = synchronization.NewTelemetryIngressBatchClient(ticfg.URL(), - ticfg.ServerPubKey(), keyStore.CSA(), ticfg.Logging(), globalLogger, ticfg.BufferSize(), ticfg.MaxBatchSize(), ticfg.SendInterval(), ticfg.SendTimeout(), ticfg.UniConn()) - monitoringEndpointGen = telemetry.NewIngressAgentBatchWrapper(telemetryIngressBatchClient) - - } else { - telemetryIngressClient = synchronization.NewTelemetryIngressClient(ticfg.URL(), - ticfg.ServerPubKey(), keyStore.CSA(), ticfg.Logging(), globalLogger, ticfg.BufferSize()) - monitoringEndpointGen = telemetry.NewIngressAgentWrapper(telemetryIngressClient) - } - } - srvcs = append(srvcs, telemetryIngressClient, telemetryIngressBatchClient) + telemetryManager := telemetry.NewManager(cfg.TelemetryIngress(), keyStore.CSA(), globalLogger) + srvcs = append(srvcs, telemetryManager) backupCfg := cfg.Database().Backup() if backupCfg.Mode() != config.DatabaseBackupModeNone && backupCfg.Frequency() > 0 { @@ -361,7 +344,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { keyStore, pipelineRunner, peerWrapper, - monitoringEndpointGen, + telemetryManager, legacyEVMChains, globalLogger, cfg.Database(), @@ -381,7 +364,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { mercuryORM, pipelineRunner, peerWrapper, - monitoringEndpointGen, + telemetryManager, legacyEVMChains, globalLogger, ocr2DelegateConfig, diff --git a/core/services/synchronization/telemetry_ingress_batch_client.go b/core/services/synchronization/telemetry_ingress_batch_client.go index 333352dff2d..ccafc32bc3d 100644 --- a/core/services/synchronization/telemetry_ingress_batch_client.go +++ b/core/services/synchronization/telemetry_ingress_batch_client.go @@ -4,15 +4,14 @@ import ( "context" "errors" "fmt" + "net/url" "sync" "sync/atomic" "time" "github.com/smartcontractkit/wsrpc" "github.com/smartcontractkit/wsrpc/examples/simple/keys" - "go.uber.org/multierr" - "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -50,12 +49,13 @@ func (NoopTelemetryIngressBatchClient) Ready() error { return nil } type telemetryIngressBatchClient struct { utils.StartStopOnce - endpoints []TelemetryEndpoint - ks keystore.CSA + url *url.URL + ks keystore.CSA + serverPubKeyHex string - connected map[string]*atomic.Bool - telemClient map[string]*telemPb.TelemClient - close map[string]func() error + connected atomic.Bool + telemClient telemPb.TelemClient + close func() error globalLogger logger.Logger logging bool @@ -77,69 +77,24 @@ type telemetryIngressBatchClient struct { // NewTelemetryIngressBatchClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressBatchClient(cfg config.TelemetryIngress, ks keystore.CSA, lggr logger.Logger) TelemetryIngressBatchClient { - +func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool) TelemetryIngressBatchClient { return &telemetryIngressBatchClient{ - endpoints: parseEndpoints(cfg.Endpoints(), lggr), - telemBufferSize: cfg.BufferSize(), - telemMaxBatchSize: cfg.MaxBatchSize(), - telemSendInterval: cfg.SendInterval(), - telemSendTimeout: cfg.SendTimeout(), + telemBufferSize: telemBufferSize, + telemMaxBatchSize: telemMaxBatchSize, + telemSendInterval: telemSendInterval, + telemSendTimeout: telemSendTimeout, + url: url, ks: ks, + serverPubKeyHex: serverPubKeyHex, globalLogger: lggr, - logging: cfg.Logging(), + logging: logging, lggr: lggr.Named("TelemetryIngressBatchClient"), chDone: make(chan struct{}), workers: make(map[string]*telemetryIngressBatchWorker), - useUniConn: cfg.UniConn(), + useUniConn: useUniconn, } } -func (tc *telemetryIngressBatchClient) connect(ctx context.Context, e TelemetryEndpoint) error { - clientPrivKey, err := tc.getCSAPrivateKey() - if err != nil { - return err - } - - srvPubKey := keys.FromHex(e.ServerPubKey) - - // Initialize a new wsrpc client caller - // This is used to call RPC methods on the server - if tc.telemClient == nil { // only preset for tests - if tc.useUniConn { - go func() { - // Use background context to retry forever to connect - // Blocks until we connect - conn, err := wsrpc.DialUniWithContext(ctx, tc.lggr, e.URL.String(), clientPrivKey, srvPubKey) - if err != nil { - if ctx.Err() != nil { - tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err) - } else { - tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err) - tc.SvcErrBuffer.Append(err) - } - } else { - telemClient := telemPb.NewTelemClient(conn) - tc.telemClient[createTelemClientKey(e.Network, e.ChainID)] = &telemClient - tc.close[createTelemClientKey(e.Network, e.ChainID)] = conn.Close - tc.connected[createTelemClientKey(e.Network, e.ChainID)].Store(true) - } - }() - } else { - // Spawns a goroutine that will eventually connect - conn, err := wsrpc.DialWithContext(ctx, e.URL.String(), wsrpc.WithTransportCreds(clientPrivKey, srvPubKey), wsrpc.WithLogger(tc.lggr)) - if err != nil { - return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err) - } - telemClient := telemPb.NewTelemClient(conn) - tc.telemClient[createTelemClientKey(e.Network, e.ChainID)] = &telemClient - tc.close[createTelemClientKey(e.Network, e.ChainID)] = func() error { conn.Close(); return nil } - } - } - - return nil -} - // Start connects the wsrpc client to the telemetry ingress server // // If a connection cannot be established with the ingress server, Dial will return without @@ -147,12 +102,47 @@ func (tc *telemetryIngressBatchClient) connect(ctx context.Context, e TelemetryE // server does come back up, wsrpc will establish the connection without any interaction // on behalf of the node operator. func (tc *telemetryIngressBatchClient) Start(ctx context.Context) error { - var err error return tc.StartOnce("TelemetryIngressBatchClient", func() error { - for _, e := range tc.endpoints { - err = multierr.Append(err, tc.connect(ctx, e)) + clientPrivKey, err := tc.getCSAPrivateKey() + if err != nil { + return err } - return err + + serverPubKey := keys.FromHex(tc.serverPubKeyHex) + + // Initialize a new wsrpc client caller + // This is used to call RPC methods on the server + if tc.telemClient == nil { // only preset for tests + if tc.useUniConn { + go func() { + // Use background context to retry forever to connect + // Blocks until we connect + conn, err := wsrpc.DialUniWithContext(ctx, tc.lggr, tc.url.String(), clientPrivKey, serverPubKey) + if err != nil { + if ctx.Err() != nil { + tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err) + } else { + tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err) + tc.SvcErrBuffer.Append(err) + } + } else { + tc.telemClient = telemPb.NewTelemClient(conn) + tc.close = conn.Close + tc.connected.Store(true) + } + }() + } else { + // Spawns a goroutine that will eventually connect + conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr)) + if err != nil { + return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err) + } + tc.telemClient = telemPb.NewTelemClient(conn) + tc.close = func() error { conn.Close(); return nil } + } + } + + return nil }) } @@ -161,12 +151,9 @@ func (tc *telemetryIngressBatchClient) Close() error { return tc.StopOnce("TelemetryIngressBatchClient", func() error { close(tc.chDone) tc.wgDone.Wait() - for k := range tc.connected { - if (tc.useUniConn && tc.connected[k].Load()) || !tc.useUniConn { - return tc.close[k]() - } + if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn { + return tc.close() } - return nil }) } @@ -196,8 +183,8 @@ func (tc *telemetryIngressBatchClient) getCSAPrivateKey() (privkey []byte, err e // the ingress server. If the worker telemetry buffer is full, messages are dropped // and a warning is logged. func (tc *telemetryIngressBatchClient) Send(payload TelemPayload) { - if tc.useUniConn && !tc.connected[createTelemClientKey(payload.Network, payload.ChainID)].Load() { - //tc.lggr.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String()) + if tc.useUniConn && !tc.connected.Load() { + tc.lggr.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String()) return } worker := tc.findOrCreateWorker(payload) @@ -216,22 +203,15 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload) tc.workersMutex.Lock() defer tc.workersMutex.Unlock() - workerKey := fmt.Sprintf("%s_%s_%s_%s", payload.ContractID, payload.TelemType, payload.Network, payload.ChainID) + workerKey := fmt.Sprintf("%s_%s", payload.ContractID, payload.TelemType) worker, found := tc.workers[workerKey] if !found { - - telemClient, err := tc.findTelemClient(payload.Network, payload.ChainID) - if err != nil { - tc.lggr.Warnw("cannot find telemetry client", "network", payload.Network, "chainID", payload.ChainID) - return nil - } - worker = NewTelemetryIngressBatchWorker( tc.telemMaxBatchSize, tc.telemSendInterval, tc.telemSendTimeout, - *telemClient, + tc.telemClient, &tc.wgDone, tc.chDone, make(chan TelemPayload, tc.telemBufferSize), @@ -246,16 +226,3 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload) return worker } - -func (tc *telemetryIngressBatchClient) findTelemClient(network string, chainID string) (*telemPb.TelemClient, error) { - telemClient, ok := tc.telemClient[createTelemClientKey(network, chainID)] - if !ok { - return nil, errors.New("cannot find telemetry client for network " + network + " chainID " + chainID) - } - return telemClient, nil - -} - -func createTelemClientKey(network string, chainID string) string { - return fmt.Sprintf("%s_%s", network, chainID) -} diff --git a/core/services/synchronization/telemetry_ingress_client.go b/core/services/synchronization/telemetry_ingress_client.go index 40e9b6b6ee7..96b3c2375c7 100644 --- a/core/services/synchronization/telemetry_ingress_client.go +++ b/core/services/synchronization/telemetry_ingress_client.go @@ -11,7 +11,6 @@ import ( "github.com/smartcontractkit/wsrpc" "github.com/smartcontractkit/wsrpc/examples/simple/keys" - "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" @@ -49,8 +48,9 @@ func (NoopTelemetryIngressClient) Ready() error { return nil } type telemetryIngressClient struct { utils.StartStopOnce - endpoints []TelemetryEndpoint - ks keystore.CSA + url *url.URL + ks keystore.CSA + serverPubKeyHex string telemClient telemPb.TelemClient logging bool @@ -71,24 +71,17 @@ type TelemPayload struct { ChainID string } -type TelemetryEndpoint struct { - Network string - ChainID string - URL url.URL - ServerPubKey string -} - // NewTelemetryIngressClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressClient(ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, endpoints []config.TelemetryIngressEndpoint) TelemetryIngressClient { - lggr = lggr.Named("TelemetryIngressClient") +func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint) TelemetryIngressClient { return &telemetryIngressClient{ - endpoints: parseEndpoints(endpoints, lggr), - ks: ks, - logging: logging, - lggr: lggr, - chTelemetry: make(chan TelemPayload, telemBufferSize), - chDone: make(chan struct{}), + url: url, + ks: ks, + serverPubKeyHex: serverPubKeyHex, + logging: logging, + lggr: lggr.Named("TelemetryIngressClient"), + chTelemetry: make(chan TelemPayload, telemBufferSize), + chDone: make(chan struct{}), } } @@ -99,9 +92,8 @@ func (tc *telemetryIngressClient) Start(ctx context.Context) error { if err != nil { return err } - for _, e := range tc.endpoints { - tc.connect(ctx, privkey, e.URL, e.ServerPubKey) - } + + tc.connect(ctx, privkey) return nil }) @@ -124,27 +116,15 @@ func (tc *telemetryIngressClient) HealthReport() map[string]error { return map[string]error{tc.Name(): tc.StartStopOnce.Healthy()} } -func parseEndpoints(c []config.TelemetryIngressEndpoint, lggr logger.Logger) []TelemetryEndpoint { - var telemEndpoints []TelemetryEndpoint - for _, e := range c { - if e.URL() == nil { - lggr.Warnw("telemetry endpoint does not have URL", "network", e.Network(), "chainID", e.ChainID()) - continue - } - telemEndpoints = append(telemEndpoints, TelemetryEndpoint{e.Network(), e.ChainID(), *e.URL(), e.ServerPubKey()}) - } - return telemEndpoints -} - -func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []byte, url url.URL, serverPubKeyHex string) { +func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []byte) { tc.wgDone.Add(1) go func() { defer tc.wgDone.Done() - serverPubKey := keys.FromHex(serverPubKeyHex) + serverPubKey := keys.FromHex(tc.serverPubKeyHex) - conn, err := wsrpc.DialWithContext(ctx, url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr)) + conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr)) if err != nil { if ctx.Err() != nil { tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err) diff --git a/core/services/telemetry/ingress.go b/core/services/telemetry/ingress.go index 360953184de..98ce69d581f 100644 --- a/core/services/telemetry/ingress.go +++ b/core/services/telemetry/ingress.go @@ -19,20 +19,24 @@ func NewIngressAgentWrapper(telemetryIngressClient synchronization.TelemetryIngr } func (t *IngressAgentWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint { - return NewIngressAgent(t.telemetryIngressClient, contractID, telemType) + return NewIngressAgent(t.telemetryIngressClient, contractID, telemType, network, chainID) } type IngressAgent struct { telemetryIngressClient synchronization.TelemetryIngressClient contractID string telemType synchronization.TelemetryType + network string + chainID string } -func NewIngressAgent(telemetryIngressClient synchronization.TelemetryIngressClient, contractID string, telemType synchronization.TelemetryType) *IngressAgent { +func NewIngressAgent(telemetryIngressClient synchronization.TelemetryIngressClient, contractID string, telemType synchronization.TelemetryType, network string, chainID string) *IngressAgent { return &IngressAgent{ telemetryIngressClient, contractID, telemType, + network, + chainID, } } diff --git a/core/services/telemetry/manager.go b/core/services/telemetry/manager.go new file mode 100644 index 00000000000..32a04d08134 --- /dev/null +++ b/core/services/telemetry/manager.go @@ -0,0 +1,170 @@ +package telemetry + +import ( + "context" + "fmt" + "net/url" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/smartcontractkit/libocr/commontypes" + "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/synchronization" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +type TelemetryClient interface { + services.ServiceCtx + Send(payload synchronization.TelemPayload) +} + +type Manager struct { + utils.StartStopOnce + bufferSize uint + endpoints []*telemetryEndpoint + ks keystore.CSA + lggr logger.Logger + logging bool + maxBatchSize uint + sendInterval time.Duration + sendTimeout time.Duration + uniConn bool + useBatchSend bool + MonitoringEndpointGenerator MonitoringEndpointGenerator +} + +type telemetryEndpoint struct { + utils.StartStopOnce + ChainID string + Network string + URL *url.URL + client TelemetryClient + pubKey string +} + +func NewManager(cfg config.TelemetryIngress, ks keystore.CSA, lggr logger.Logger) *Manager { + m := &Manager{ + bufferSize: cfg.BufferSize(), + endpoints: nil, + ks: ks, + lggr: lggr.Named("TelemetryManager"), + logging: cfg.Logging(), + maxBatchSize: cfg.MaxBatchSize(), + sendInterval: cfg.SendInterval(), + sendTimeout: cfg.SendTimeout(), + uniConn: cfg.UniConn(), + useBatchSend: cfg.UseBatchSend(), + } + for _, e := range cfg.Endpoints() { + if err := m.addEndpoint(e); err != nil { + m.lggr.Error(err.Error()) + } + + } + return m +} + +func (m *Manager) Start(ctx context.Context) error { + return m.StartOnce("TelemetryManager", func() error { + var err error + for _, e := range m.endpoints { + err = multierr.Append(err, e.client.Start(ctx)) + } + return err + }) +} +func (m *Manager) Close() error { + return m.StopOnce("TelemetryManager", func() error { + var err error + for _, e := range m.endpoints { + err = multierr.Append(err, e.client.Close()) + } + return err + }) +} + +func (m *Manager) Name() string { + return m.lggr.Name() +} + +func (m *Manager) HealthReport() map[string]error { + hr := make(map[string]error) + hr[m.lggr.Name()] = m.Healthy() + for _, e := range m.endpoints { + name := fmt.Sprintf("%s.%s.%s", m.lggr.Name(), e.Network, e.ChainID) + hr[name] = e.StartStopOnce.Healthy() + } + return hr +} + +func (m *Manager) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) commontypes.MonitoringEndpoint { + + e, found := m.getEndpoint(network, chainID) + + if !found { + m.lggr.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contactID %q will NOT be sent", network, chainID, telemType, contractID) + return &NoopAgent{} + } + + if m.useBatchSend { + return NewIngressAgentBatch(e.client, contractID, telemType, network, chainID) + } else { + return NewIngressAgent(e.client, contractID, telemType, network, chainID) + } + +} + +func (m *Manager) addEndpoint(e config.TelemetryIngressEndpoint) error { + if e.Network() == "" { + return errors.New("cannot add telemetry endpoint, network cannot be empty") + } + + if e.ChainID() == "" { + return errors.New("cannot add telemetry endpoint, chainID cannot be empty") + } + + if e.URL() == nil { + return errors.New("cannot add telemetry endpoint, URL cannot be empty") + } + + if e.ServerPubKey() == "" { + return errors.New("cannot add telemetry endpoint, ServerPubKey cannot be empty") + } + + if _, found := m.getEndpoint(e.Network(), e.ChainID()); found != false { + return errors.New(fmt.Sprintf("cannot add telemetry endpoint for network %q and chainID %q, endpoint already exists", e.Network(), e.ChainID())) + } + + var tClient TelemetryClient + if m.useBatchSend { + tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize, m.maxBatchSize, m.sendInterval, m.sendTimeout, m.uniConn) + } else { + tClient = synchronization.NewTelemetryIngressClient(e.URL(), e.ServerPubKey(), m.ks, m.logging, m.lggr, m.bufferSize) + } + + te := telemetryEndpoint{ + Network: strings.ToUpper(e.Network()), + ChainID: strings.ToUpper(e.ChainID()), + URL: e.URL(), + pubKey: e.ServerPubKey(), + client: tClient, + } + + m.endpoints = append(m.endpoints, &te) + return nil +} + +func (m *Manager) getEndpoint(network string, chainID string) (*telemetryEndpoint, bool) { + for _, e := range m.endpoints { + if e.Network == strings.ToUpper(network) && e.ChainID == strings.ToUpper(chainID) { + return e, true + } + } + return nil, false +}