Skip to content

Commit

Permalink
Add manager
Browse files Browse the repository at this point in the history
  • Loading branch information
george-dorin committed Sep 14, 2023
1 parent 2af09a8 commit 5103517
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 153 deletions.
25 changes: 4 additions & 21 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
Expand Down Expand Up @@ -219,24 +218,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

healthChecker := services.NewChecker()

telemetryIngressClient := synchronization.TelemetryIngressClient(&synchronization.NoopTelemetryIngressClient{})
telemetryIngressBatchClient := synchronization.TelemetryIngressBatchClient(&synchronization.NoopTelemetryIngressBatchClient{})
monitoringEndpointGen := telemetry.MonitoringEndpointGenerator(&telemetry.NoopAgent{})

ticfg := cfg.TelemetryIngress()
if ticfg.URL() != nil {
if ticfg.UseBatchSend() {
telemetryIngressBatchClient = synchronization.NewTelemetryIngressBatchClient(ticfg.URL(),
ticfg.ServerPubKey(), keyStore.CSA(), ticfg.Logging(), globalLogger, ticfg.BufferSize(), ticfg.MaxBatchSize(), ticfg.SendInterval(), ticfg.SendTimeout(), ticfg.UniConn())
monitoringEndpointGen = telemetry.NewIngressAgentBatchWrapper(telemetryIngressBatchClient)

} else {
telemetryIngressClient = synchronization.NewTelemetryIngressClient(ticfg.URL(),
ticfg.ServerPubKey(), keyStore.CSA(), ticfg.Logging(), globalLogger, ticfg.BufferSize())
monitoringEndpointGen = telemetry.NewIngressAgentWrapper(telemetryIngressClient)
}
}
srvcs = append(srvcs, telemetryIngressClient, telemetryIngressBatchClient)
telemetryManager := telemetry.NewManager(cfg.TelemetryIngress(), keyStore.CSA(), globalLogger)
srvcs = append(srvcs, telemetryManager)

backupCfg := cfg.Database().Backup()
if backupCfg.Mode() != config.DatabaseBackupModeNone && backupCfg.Frequency() > 0 {
Expand Down Expand Up @@ -361,7 +344,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
keyStore,
pipelineRunner,
peerWrapper,
monitoringEndpointGen,
telemetryManager,
legacyEVMChains,
globalLogger,
cfg.Database(),
Expand All @@ -381,7 +364,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
mercuryORM,
pipelineRunner,
peerWrapper,
monitoringEndpointGen,
telemetryManager,
legacyEVMChains,
globalLogger,
ocr2DelegateConfig,
Expand Down
155 changes: 61 additions & 94 deletions core/services/synchronization/telemetry_ingress_batch_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/smartcontractkit/wsrpc"
"github.com/smartcontractkit/wsrpc/examples/simple/keys"
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand Down Expand Up @@ -50,12 +49,13 @@ func (NoopTelemetryIngressBatchClient) Ready() error { return nil }

type telemetryIngressBatchClient struct {
utils.StartStopOnce
endpoints []TelemetryEndpoint
ks keystore.CSA
url *url.URL
ks keystore.CSA
serverPubKeyHex string

connected map[string]*atomic.Bool
telemClient map[string]*telemPb.TelemClient
close map[string]func() error
connected atomic.Bool
telemClient telemPb.TelemClient
close func() error

globalLogger logger.Logger
logging bool
Expand All @@ -77,82 +77,72 @@ type telemetryIngressBatchClient struct {

// NewTelemetryIngressBatchClient returns a client backed by wsrpc that
// can send telemetry to the telemetry ingress server
func NewTelemetryIngressBatchClient(cfg config.TelemetryIngress, ks keystore.CSA, lggr logger.Logger) TelemetryIngressBatchClient {

func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration, useUniconn bool) TelemetryIngressBatchClient {
return &telemetryIngressBatchClient{
endpoints: parseEndpoints(cfg.Endpoints(), lggr),
telemBufferSize: cfg.BufferSize(),
telemMaxBatchSize: cfg.MaxBatchSize(),
telemSendInterval: cfg.SendInterval(),
telemSendTimeout: cfg.SendTimeout(),
telemBufferSize: telemBufferSize,
telemMaxBatchSize: telemMaxBatchSize,
telemSendInterval: telemSendInterval,
telemSendTimeout: telemSendTimeout,
url: url,
ks: ks,
serverPubKeyHex: serverPubKeyHex,
globalLogger: lggr,
logging: cfg.Logging(),
logging: logging,
lggr: lggr.Named("TelemetryIngressBatchClient"),
chDone: make(chan struct{}),
workers: make(map[string]*telemetryIngressBatchWorker),
useUniConn: cfg.UniConn(),
useUniConn: useUniconn,
}
}

func (tc *telemetryIngressBatchClient) connect(ctx context.Context, e TelemetryEndpoint) error {
clientPrivKey, err := tc.getCSAPrivateKey()
if err != nil {
return err
}

srvPubKey := keys.FromHex(e.ServerPubKey)

// Initialize a new wsrpc client caller
// This is used to call RPC methods on the server
if tc.telemClient == nil { // only preset for tests
if tc.useUniConn {
go func() {
// Use background context to retry forever to connect
// Blocks until we connect
conn, err := wsrpc.DialUniWithContext(ctx, tc.lggr, e.URL.String(), clientPrivKey, srvPubKey)
if err != nil {
if ctx.Err() != nil {
tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err)
} else {
tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err)
tc.SvcErrBuffer.Append(err)
}
} else {
telemClient := telemPb.NewTelemClient(conn)
tc.telemClient[createTelemClientKey(e.Network, e.ChainID)] = &telemClient
tc.close[createTelemClientKey(e.Network, e.ChainID)] = conn.Close
tc.connected[createTelemClientKey(e.Network, e.ChainID)].Store(true)
}
}()
} else {
// Spawns a goroutine that will eventually connect
conn, err := wsrpc.DialWithContext(ctx, e.URL.String(), wsrpc.WithTransportCreds(clientPrivKey, srvPubKey), wsrpc.WithLogger(tc.lggr))
if err != nil {
return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err)
}
telemClient := telemPb.NewTelemClient(conn)
tc.telemClient[createTelemClientKey(e.Network, e.ChainID)] = &telemClient
tc.close[createTelemClientKey(e.Network, e.ChainID)] = func() error { conn.Close(); return nil }
}
}

return nil
}

// Start connects the wsrpc client to the telemetry ingress server
//
// If a connection cannot be established with the ingress server, Dial will return without
// an error and wsrpc will continue to retry the connection. Eventually when the ingress
// server does come back up, wsrpc will establish the connection without any interaction
// on behalf of the node operator.
func (tc *telemetryIngressBatchClient) Start(ctx context.Context) error {
var err error
return tc.StartOnce("TelemetryIngressBatchClient", func() error {
for _, e := range tc.endpoints {
err = multierr.Append(err, tc.connect(ctx, e))
clientPrivKey, err := tc.getCSAPrivateKey()
if err != nil {
return err
}
return err

serverPubKey := keys.FromHex(tc.serverPubKeyHex)

// Initialize a new wsrpc client caller
// This is used to call RPC methods on the server
if tc.telemClient == nil { // only preset for tests
if tc.useUniConn {
go func() {
// Use background context to retry forever to connect
// Blocks until we connect
conn, err := wsrpc.DialUniWithContext(ctx, tc.lggr, tc.url.String(), clientPrivKey, serverPubKey)
if err != nil {
if ctx.Err() != nil {
tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err)
} else {
tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err)
tc.SvcErrBuffer.Append(err)
}
} else {
tc.telemClient = telemPb.NewTelemClient(conn)
tc.close = conn.Close
tc.connected.Store(true)
}
}()
} else {
// Spawns a goroutine that will eventually connect
conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr))
if err != nil {
return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err)
}
tc.telemClient = telemPb.NewTelemClient(conn)
tc.close = func() error { conn.Close(); return nil }
}
}

return nil
})
}

Expand All @@ -161,12 +151,9 @@ func (tc *telemetryIngressBatchClient) Close() error {
return tc.StopOnce("TelemetryIngressBatchClient", func() error {
close(tc.chDone)
tc.wgDone.Wait()
for k := range tc.connected {
if (tc.useUniConn && tc.connected[k].Load()) || !tc.useUniConn {
return tc.close[k]()
}
if (tc.useUniConn && tc.connected.Load()) || !tc.useUniConn {
return tc.close()
}

return nil
})
}
Expand Down Expand Up @@ -196,8 +183,8 @@ func (tc *telemetryIngressBatchClient) getCSAPrivateKey() (privkey []byte, err e
// the ingress server. If the worker telemetry buffer is full, messages are dropped
// and a warning is logged.
func (tc *telemetryIngressBatchClient) Send(payload TelemPayload) {
if tc.useUniConn && !tc.connected[createTelemClientKey(payload.Network, payload.ChainID)].Load() {
//tc.lggr.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String())
if tc.useUniConn && !tc.connected.Load() {
tc.lggr.Warnw("not connected to telemetry endpoint", "endpoint", tc.url.String())
return
}
worker := tc.findOrCreateWorker(payload)
Expand All @@ -216,22 +203,15 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload)
tc.workersMutex.Lock()
defer tc.workersMutex.Unlock()

workerKey := fmt.Sprintf("%s_%s_%s_%s", payload.ContractID, payload.TelemType, payload.Network, payload.ChainID)
workerKey := fmt.Sprintf("%s_%s", payload.ContractID, payload.TelemType)
worker, found := tc.workers[workerKey]

if !found {

telemClient, err := tc.findTelemClient(payload.Network, payload.ChainID)
if err != nil {
tc.lggr.Warnw("cannot find telemetry client", "network", payload.Network, "chainID", payload.ChainID)
return nil
}

worker = NewTelemetryIngressBatchWorker(
tc.telemMaxBatchSize,
tc.telemSendInterval,
tc.telemSendTimeout,
*telemClient,
tc.telemClient,
&tc.wgDone,
tc.chDone,
make(chan TelemPayload, tc.telemBufferSize),
Expand All @@ -246,16 +226,3 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload)

return worker
}

func (tc *telemetryIngressBatchClient) findTelemClient(network string, chainID string) (*telemPb.TelemClient, error) {
telemClient, ok := tc.telemClient[createTelemClientKey(network, chainID)]
if !ok {
return nil, errors.New("cannot find telemetry client for network " + network + " chainID " + chainID)
}
return telemClient, nil

}

func createTelemClientKey(network string, chainID string) string {
return fmt.Sprintf("%s_%s", network, chainID)
}
52 changes: 16 additions & 36 deletions core/services/synchronization/telemetry_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/smartcontractkit/wsrpc"
"github.com/smartcontractkit/wsrpc/examples/simple/keys"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand Down Expand Up @@ -49,8 +48,9 @@ func (NoopTelemetryIngressClient) Ready() error { return nil }

type telemetryIngressClient struct {
utils.StartStopOnce
endpoints []TelemetryEndpoint
ks keystore.CSA
url *url.URL
ks keystore.CSA
serverPubKeyHex string

telemClient telemPb.TelemClient
logging bool
Expand All @@ -71,24 +71,17 @@ type TelemPayload struct {
ChainID string
}

type TelemetryEndpoint struct {
Network string
ChainID string
URL url.URL
ServerPubKey string
}

// NewTelemetryIngressClient returns a client backed by wsrpc that
// can send telemetry to the telemetry ingress server
func NewTelemetryIngressClient(ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, endpoints []config.TelemetryIngressEndpoint) TelemetryIngressClient {
lggr = lggr.Named("TelemetryIngressClient")
func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint) TelemetryIngressClient {
return &telemetryIngressClient{
endpoints: parseEndpoints(endpoints, lggr),
ks: ks,
logging: logging,
lggr: lggr,
chTelemetry: make(chan TelemPayload, telemBufferSize),
chDone: make(chan struct{}),
url: url,
ks: ks,
serverPubKeyHex: serverPubKeyHex,
logging: logging,
lggr: lggr.Named("TelemetryIngressClient"),
chTelemetry: make(chan TelemPayload, telemBufferSize),
chDone: make(chan struct{}),
}
}

Expand All @@ -99,9 +92,8 @@ func (tc *telemetryIngressClient) Start(ctx context.Context) error {
if err != nil {
return err
}
for _, e := range tc.endpoints {
tc.connect(ctx, privkey, e.URL, e.ServerPubKey)
}

tc.connect(ctx, privkey)

return nil
})
Expand All @@ -124,27 +116,15 @@ func (tc *telemetryIngressClient) HealthReport() map[string]error {
return map[string]error{tc.Name(): tc.StartStopOnce.Healthy()}
}

func parseEndpoints(c []config.TelemetryIngressEndpoint, lggr logger.Logger) []TelemetryEndpoint {
var telemEndpoints []TelemetryEndpoint
for _, e := range c {
if e.URL() == nil {
lggr.Warnw("telemetry endpoint does not have URL", "network", e.Network(), "chainID", e.ChainID())
continue
}
telemEndpoints = append(telemEndpoints, TelemetryEndpoint{e.Network(), e.ChainID(), *e.URL(), e.ServerPubKey()})
}
return telemEndpoints
}

func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []byte, url url.URL, serverPubKeyHex string) {
func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []byte) {
tc.wgDone.Add(1)

go func() {
defer tc.wgDone.Done()

serverPubKey := keys.FromHex(serverPubKeyHex)
serverPubKey := keys.FromHex(tc.serverPubKeyHex)

conn, err := wsrpc.DialWithContext(ctx, url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr))
conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr))
if err != nil {
if ctx.Err() != nil {
tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err)
Expand Down
Loading

0 comments on commit 5103517

Please sign in to comment.