Skip to content

Commit

Permalink
[BCF-2750] Reorder telemetry params (#11147)
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored Nov 3, 2023
1 parent 4174f36 commit 3053562
Show file tree
Hide file tree
Showing 14 changed files with 50 additions and 50 deletions.
2 changes: 1 addition & 1 deletion core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySe

ingressClient := sync_mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monEndpoint := ingressAgent.GenMonitoringEndpoint(contractAddress, synchronization.FunctionsRequests, "test-network", "test-chainID")
monEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", contractAddress, synchronization.FunctionsRequests)

s4Storage := s4_mocks.NewStorage(t)
client := chain.Client()
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e

enhancedTelemChan := make(chan ocrcommon.EnhancedTelemetryData, 100)
if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(concreteSpec.ContractAddress.String(), synchronization.EnhancedEA, "EVM", chain.ID().String()), lggr.Named("EnhancedTelemetry"))
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint("EVM", chain.ID().String(), concreteSpec.ContractAddress.String(), synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry"))
services = append(services, enhancedTelemService)
}

Expand All @@ -319,7 +319,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
Logger: ocrLogger,
V1Bootstrappers: v1BootstrapPeers,
V2Bootstrappers: v2Bootstrappers,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(concreteSpec.ContractAddress.String(), synchronization.OCR, "EVM", chain.ID().String()),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint("EVM", chain.ID().String(), concreteSpec.ContractAddress.String(), synchronization.OCR),
ConfigOverrider: configOverrider,
})
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,10 @@ func (d *Delegate) newServicesGenericPlugin(
srvs = append(srvs, provider)

oracleEndpoint := d.monitoringEndpointGen.GenMonitoringEndpoint(
spec.ContractID,
synchronization.TelemetryType(cconf.TelemetryType),
rid.Network,
rid.ChainID,
spec.ContractID,
synchronization.TelemetryType(cconf.TelemetryType),
)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
Expand Down Expand Up @@ -686,7 +686,7 @@ func (d *Delegate) newServicesMercury(
Database: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.OCR3Mercury, rid.Network, rid.ChainID),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.OCR3Mercury),
OffchainConfigDigester: mercuryProvider.OffchainConfigDigester(),
OffchainKeyring: kb,
OnchainKeyring: kb,
Expand All @@ -697,7 +697,7 @@ func (d *Delegate) newServicesMercury(
mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetryMercury"))
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
mercuryServices = append(mercuryServices, enhancedTelemService)
}

Expand Down Expand Up @@ -728,7 +728,7 @@ func (d *Delegate) newServicesMedian(
Database: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Median, rid.Network, rid.ChainID),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.OCR2Median),
OffchainKeyring: kb,
OnchainKeyring: kb,
}
Expand All @@ -744,7 +744,7 @@ func (d *Delegate) newServicesMedian(
medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)

if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.EnhancedEA, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetry"))
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry"))
medianServices = append(medianServices, enhancedTelemService)
}

Expand Down Expand Up @@ -965,7 +965,7 @@ func (d *Delegate) newServicesOCR2VRF(
VRFContractTransmitter: vrfProvider.ContractTransmitter(),
VRFDatabase: ocrDB,
VRFLocalConfig: lc,
VRFMonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2VRF, rid.Network, rid.ChainID),
VRFMonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.OCR2VRF),
DKGContractConfigTracker: dkgProvider.ContractConfigTracker(),
DKGOffchainConfigDigester: dkgProvider.OffchainConfigDigester(),
DKGContract: dkgpkg.NewOnchainContract(dkgContract, &altbn_128.G2{}),
Expand Down Expand Up @@ -1104,7 +1104,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
ContractConfigTracker: keeperProvider.ContractConfigTracker(),
KeepersDatabase: ocrDB,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation, rid.Network, rid.ChainID),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.OCR2Automation),
OffchainConfigDigester: keeperProvider.OffchainConfigDigester(),
OffchainKeyring: kb,
OnchainKeyring: services.Keyring(),
Expand Down Expand Up @@ -1249,7 +1249,7 @@ func (d *Delegate) newServicesOCR2Keepers20(
KeepersDatabase: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Automation, rid.Network, rid.ChainID),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.OCR2Automation),
OffchainConfigDigester: keeperProvider.OffchainConfigDigester(),
OffchainKeyring: kb,
OnchainKeyring: kb,
Expand Down Expand Up @@ -1358,7 +1358,7 @@ func (d *Delegate) newServicesOCR2Functions(
Database: functionsOcrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Functions, rid.Network, rid.ChainID),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.OCR2Functions),
OffchainConfigDigester: functionsProvider.OffchainConfigDigester(),
OffchainKeyring: kb,
OnchainKeyring: kb,
Expand Down Expand Up @@ -1422,7 +1422,7 @@ func (d *Delegate) newServicesOCR2Functions(
ContractID: spec.ContractID,
Logger: lggr,
MailMon: d.mailMon,
URLsMonEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.FunctionsRequests, rid.Network, rid.ChainID),
URLsMonEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.FunctionsRequests),
EthKeystore: d.ethKs,
ThresholdKeyShare: thresholdKeyShare,
LogPollerWrapper: functionsProvider.LogPollerWrapper(),
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/generic/telemetry_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (t *TelemetryAdapter) getOrCreateEndpoint(network string, chainID string, c
key := [4]string{network, chainID, contractID, telemetryType}
e, ok := t.endpoints[key]
if !ok {
e = t.endpointGenerator.GenMonitoringEndpoint(contractID, synchronization.TelemetryType(telemetryType), network, chainID)
e = t.endpointGenerator.GenMonitoringEndpoint(network, chainID, contractID, synchronization.TelemetryType(telemetryType))
t.endpoints[key] = e
}
return e, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (m *mockEndpoint) SendLog(payload []byte) { m.payload = payload }

type mockGenerator struct{}

func (m *mockGenerator) GenMonitoringEndpoint(contractID string, telemetryType synchronization.TelemetryType, network string, chainID string) commontypes.MonitoringEndpoint {
func (m *mockGenerator) GenMonitoringEndpoint(network string, chainID string, contractID string, telemetryType synchronization.TelemetryType) commontypes.MonitoringEndpoint {
return &mockEndpoint{
network: network,
chainID: chainID,
Expand Down
8 changes: 4 additions & 4 deletions core/services/ocrcommon/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSendEATelemetry(t *testing.T) {
wg := sync.WaitGroup{}
ingressClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA, "test-network", "test-chainID")
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEA)

var sentMessage []byte
ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestCollectAndSend(t *testing.T) {
wg := sync.WaitGroup{}
ingressClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEA, "test-network", "test-chainID")
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEA)
ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) {
wg.Done()
})
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) {
wg := sync.WaitGroup{}
ingressClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEAMercury, "test-network", "test-chainID")
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury)

var sentMessage []byte
ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) {
wg := sync.WaitGroup{}
ingressClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEAMercury, "test-network", "test-chainID")
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury)

var sentMessage []byte
ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) {
Expand Down
2 changes: 1 addition & 1 deletion core/services/telemetry/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (
)

type MonitoringEndpointGenerator interface {
GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint
GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint
}
14 changes: 7 additions & 7 deletions core/services/telemetry/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ func NewIngressAgentWrapper(telemetryIngressClient synchronization.TelemetryServ
return &IngressAgentWrapper{telemetryIngressClient}
}

func (t *IngressAgentWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint {
return NewIngressAgent(t.telemetryIngressClient, contractID, telemType, network, chainID)
func (t *IngressAgentWrapper) GenMonitoringEndpoint(network, chainID string, contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint {
return NewIngressAgent(t.telemetryIngressClient, network, chainID, contractID, telemType)
}

type IngressAgent struct {
telemetryIngressClient synchronization.TelemetryService
contractID string
telemType synchronization.TelemetryType
network string
chainID string
contractID string
telemType synchronization.TelemetryType
}

func NewIngressAgent(telemetryIngressClient synchronization.TelemetryService, contractID string, telemType synchronization.TelemetryType, network string, chainID string) *IngressAgent {
func NewIngressAgent(telemetryIngressClient synchronization.TelemetryService, network string, chainID string, contractID string, telemType synchronization.TelemetryType) *IngressAgent {
return &IngressAgent{
telemetryIngressClient,
contractID,
telemType,
network,
chainID,
contractID,
telemType,
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/services/telemetry/ingress_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ func NewIngressAgentBatchWrapper(telemetryIngressBatchClient synchronization.Tel
}

// GenMonitoringEndpoint returns a new ingress batch agent instantiated with the batch client and a contractID
func (t *IngressAgentBatchWrapper) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) ocrtypes.MonitoringEndpoint {
return NewIngressAgentBatch(t.telemetryIngressBatchClient, contractID, telemType, network, chainID)
func (t *IngressAgentBatchWrapper) GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) ocrtypes.MonitoringEndpoint {
return NewIngressAgentBatch(t.telemetryIngressBatchClient, network, chainID, contractID, telemType)
}

// IngressAgentBatch allows for sending batch telemetry for a given contractID
type IngressAgentBatch struct {
telemetryIngressBatchClient synchronization.TelemetryService
contractID string
telemType synchronization.TelemetryType
network string
chainID string
contractID string
telemType synchronization.TelemetryType
}

// NewIngressAgentBatch creates a new IngressAgentBatch with the given batch client and contractID
func NewIngressAgentBatch(telemetryIngressBatchClient synchronization.TelemetryService, contractID string, telemType synchronization.TelemetryType, network string, chainID string) *IngressAgentBatch {
func NewIngressAgentBatch(telemetryIngressBatchClient synchronization.TelemetryService, network string, chainID string, contractID string, telemType synchronization.TelemetryType) *IngressAgentBatch {
return &IngressAgentBatch{
telemetryIngressBatchClient,
contractID,
telemType,
network,
chainID,
contractID,
telemType,
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/services/telemetry/ingress_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestIngressAgentBatch(t *testing.T) {
telemetryBatchClient := mocks.NewTelemetryService(t)
ingressAgentBatch := telemetry.NewIngressAgentWrapper(telemetryBatchClient)
monitoringEndpoint := ingressAgentBatch.GenMonitoringEndpoint("0xa", synchronization.OCR, "test-network", "test-chainID")
monitoringEndpoint := ingressAgentBatch.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.OCR)

// Handle the Send call and store the telem
var telemPayload synchronization.TelemPayload
Expand Down
2 changes: 1 addition & 1 deletion core/services/telemetry/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestIngressAgent(t *testing.T) {
telemetryClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(telemetryClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.OCR, "test-network", "test-chainID")
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.OCR)

// Handle the Send call and store the telem
var telemPayload synchronization.TelemPayload
Expand Down
6 changes: 3 additions & 3 deletions core/services/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (m *Manager) HealthReport() map[string]error {
}

// GenMonitoringEndpoint creates a new monitoring endpoints based on the existing available endpoints defined in the core config TOML, if no endpoint for the network and chainID exists, a NOOP agent will be used and the telemetry will not be sent
func (m *Manager) GenMonitoringEndpoint(contractID string, telemType synchronization.TelemetryType, network string, chainID string) commontypes.MonitoringEndpoint {
func (m *Manager) GenMonitoringEndpoint(network string, chainID string, contractID string, telemType synchronization.TelemetryType) commontypes.MonitoringEndpoint {

e, found := m.getEndpoint(network, chainID)

Expand All @@ -160,10 +160,10 @@ func (m *Manager) GenMonitoringEndpoint(contractID string, telemType synchroniza
}

if m.useBatchSend {
return NewIngressAgentBatch(e.client, contractID, telemType, network, chainID)
return NewIngressAgentBatch(e.client, network, chainID, contractID, telemType)
}

return NewIngressAgent(e.client, contractID, telemType, network, chainID)
return NewIngressAgent(e.client, network, chainID, contractID, telemType)

}

Expand Down
Loading

0 comments on commit 3053562

Please sign in to comment.