Skip to content

Commit

Permalink
Skip telemetry for market status bridges
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-cll committed Sep 18, 2024
1 parent 8d51ac4 commit 830cfe7
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 24 deletions.
5 changes: 5 additions & 0 deletions .changeset/slow-lizards-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Skip telemetry for market-status bridges #internal
44 changes: 31 additions & 13 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
}

assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData)
parsedBridgeData := parseBridgeRequestData(bridgeTask.RequestData, d.FeedVersion)
if parsedBridgeData.IsMarketStatus {
// Only collect telemetry for pricing bridges.
continue
}

benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(trr, d.TaskRunResults, d.FeedVersion)

Expand Down Expand Up @@ -432,7 +436,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
BridgeRequestData: bridgeTask.RequestData,
AssetSymbol: assetSymbol,
AssetSymbol: parsedBridgeData.AssetSymbol,
Version: uint32(d.FeedVersion),
}
e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
Expand All @@ -459,12 +463,19 @@ func (e *EnhancedTelemetryService[T]) parseTelemetryAttributes(a string) (teleme
return *attrs, nil
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string {
type bridgeRequestData struct {
AssetSymbol string
IsMarketStatus bool
}

// parseRequestData parses the requestData of the bridge.
func parseBridgeRequestData(requestData string, mercuryVersion mercuryutils.FeedVersion) bridgeRequestData {
type reqDataPayload struct {
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
Endpoint *string `json:"endpoint"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
Market *string `json:"market"` // used for market status ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -473,18 +484,25 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData
rd := &reqData{}
err := json.Unmarshal([]byte(requestData), rd)
if err != nil {
return ""
return bridgeRequestData{}
}

if mercuryVersion == 4 && ((rd.Data.Endpoint != nil && *rd.Data.Endpoint == "market-status") || (rd.Data.Market != nil && *rd.Data.Market != "")) {
return bridgeRequestData{
AssetSymbol: *rd.Data.Market,
IsMarketStatus: true,
}
}

if rd.Data.From != nil && rd.Data.To != nil {
return *rd.Data.From + "/" + *rd.Data.To
return bridgeRequestData{AssetSymbol: *rd.Data.From + "/" + *rd.Data.To}
}

if rd.Data.Address != nil {
return *rd.Data.Address
return bridgeRequestData{AssetSymbol: *rd.Data.Address}
}

return ""
return bridgeRequestData{}
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand Down Expand Up @@ -599,8 +617,8 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResultsByOrder(startTask pipe
benchmarkPrice = e.parsePriceFromTask(*benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
if mercuryVersion == 2 {
// mercury versions 2 and 4 only supports benchmarkPrice
if mercuryVersion == 2 || mercuryVersion == 4 {
return benchmarkPrice, 0, 0
}

Expand Down
143 changes: 134 additions & 9 deletions core/services/ocrcommon/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/shopspring/decimal"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"
mercuryv1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1"
mercuryv2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"

mercuryv4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
Expand Down Expand Up @@ -274,7 +273,6 @@ func TestSendEATelemetry(t *testing.T) {
expectedMessage, _ := proto.Marshal(&expectedTelemetry)
wg.Wait()
assert.Equal(t, expectedMessage, sentMessage)
//enhancedTelemService.StopOnce("EnhancedTelemetryService", func() error { return nil })
doneCh <- struct{}{}
}

Expand Down Expand Up @@ -446,6 +444,45 @@ var trrsMercuryV2 = pipeline.TaskRunResults{
},
}

var trrsMercuryV4 = pipeline.TaskRunResults{
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
Name: "link-usd-test-bridge-v2",
BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0),
RequestData: `{"data":{"to":"LINK","from":"USD"}}`,
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(1, "ds1_benchmark", nil, nil, 1),
},
Result: pipeline.Result{
Value: 123456.123456,
},
},
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
Name: "market-status-bridge",
BaseTask: pipeline.NewBaseTask(2, "ds2", nil, nil, 2),
RequestData: `{"data":{"endpoint":"market-status","market":"forex"}}`,
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(3, "market_status", nil, nil, 3),
},
Result: pipeline.Result{
Value: 2.0,
},
},
}

func TestGetPricesFromBridgeByTelemetryField(t *testing.T) {
lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel)
e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{
Expand Down Expand Up @@ -618,13 +655,23 @@ func TestShouldCollectEnhancedTelemetryMercury(t *testing.T) {
require.Equal(t, ShouldCollectEnhancedTelemetryMercury(j), false)
}

func TestGetAssetSymbolFromRequestData(t *testing.T) {
e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{}
require.Equal(t, e.getAssetSymbolFromRequestData(""), "")
func TestParseBridgeRequestData(t *testing.T) {
require.Equal(t, parseBridgeRequestData("", 2), bridgeRequestData{})

reqData := `{"data":{"to":"LINK","from":"USD"}}`
require.Equal(t, e.getAssetSymbolFromRequestData(reqData), "USD/LINK")
require.Equal(t, parseBridgeRequestData(reqData, 2), bridgeRequestData{AssetSymbol: "USD/LINK"})

reqData = `{"data":{"to":"LINK","from":"USD","market":"forex"}}`
require.Equal(t, parseBridgeRequestData(reqData, 2), bridgeRequestData{AssetSymbol: "USD/LINK"})

reqData = `{"data":{"endpoint":"market-status","market":"forex"}}`
require.Equal(t, parseBridgeRequestData(reqData, 4), bridgeRequestData{AssetSymbol: "forex", IsMarketStatus: true})

reqData = `{"data":{"market":"metals"}}`
require.Equal(t, parseBridgeRequestData(reqData, 4), bridgeRequestData{AssetSymbol: "metals", IsMarketStatus: true})

viewFunctionReqData := `{"data":{"address":"0x12345678", "signature": "function stEthPerToken() view returns (int256)"}}`
require.Equal(t, "0x12345678", e.getAssetSymbolFromRequestData(viewFunctionReqData))
require.Equal(t, parseBridgeRequestData(viewFunctionReqData, 3), bridgeRequestData{AssetSymbol: "0x12345678"})
}

func getViewFunctionTaskRunResults() pipeline.TaskRunResults {
Expand Down Expand Up @@ -1019,3 +1066,81 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) {
require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry bid price")
chDone <- struct{}{}
}

func TestCollectMercuryEnhancedTelemetryV4(t *testing.T) {
ingressClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury)

sentMessageCh := make(chan []byte)
ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) {
sentMessageCh <- args[1].([]byte)
})

lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel)
chTelem := make(chan EnhancedTelemetryMercuryData, 100)
chDone := make(chan struct{})
feedID := common.HexToHash("0x0004")
e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{
chDone: chDone,
chTelem: chTelem,
job: &job.Job{
Type: job.Type(pipeline.OffchainReporting2JobType),
OCR2OracleSpec: &job.OCR2OracleSpec{
CaptureEATelemetry: true,
FeedID: &feedID,
},
},
lggr: lggr,
monitoringEndpoint: monitoringEndpoint,
}
servicetest.Run(t, &e)

chTelem <- EnhancedTelemetryMercuryData{
TaskRunResults: trrsMercuryV4,
FeedVersion: 4,
V4Observation: &mercuryv4.Observation{
BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)},
MarketStatus: mercury.ObsResult[uint32]{Val: 2},
MaxFinalizedTimestamp: mercury.ObsResult[int64]{Val: 321},
LinkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(4321)},
NativePrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(54321)},
},
RepTimestamp: types.ReportTimestamp{
ConfigDigest: types.ConfigDigest{2},
Epoch: 11,
Round: 22,
},
}

expectedPricingTelemetry := telem.EnhancedEAMercury{
DataSource: "data-source-name",
DpBenchmarkPrice: 123456.123456,
BridgeTaskRunStartedTimestamp: trrsMercuryV4[0].CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trrsMercuryV4[0].FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: 92233720368547760,
ProviderReceivedTimestamp: -92233720368547760,
ProviderDataStreamEstablished: 1,
ProviderIndicatedTime: -123456789,
Feed: common.HexToHash("0x0004").String(),
ObservationBenchmarkPrice: 111111,
ObservationMarketStatus: 2,
ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000",
Round: 22,
Epoch: 11,
AssetSymbol: "USD/LINK",
ObservationBenchmarkPriceString: "111111",
MaxFinalizedTimestamp: 321,
LinkPrice: 4321,
NativePrice: 54321,
Version: 4,
BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`,
}
expectedPricingMessage, _ := proto.Marshal(&expectedPricingTelemetry)
require.Equal(t, expectedPricingMessage, <-sentMessageCh)

chDone <- struct{}{}

// Verify that no other telemetry is sent.
require.Len(t, sentMessageCh, 0)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message EnhancedEAMercury {

string feed=14;

// v1+v2+v3
// v1+v2+v3+v4
int64 observation_benchmark_price=15; // This value overflows, will be reserved and removed in future versions
string observation_benchmark_price_string = 22;
// v1+v3
Expand Down

0 comments on commit 830cfe7

Please sign in to comment.