diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 2086eaa6a80..a8197111bd5 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -553,8 +553,8 @@ func (d *Delegate) newServicesMercury( mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) - if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&jb) { - enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) + if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { + enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(spec.FeedID.String(), synchronization.EnhancedEAMercury, rid.Network, rid.ChainID), lggr.Named("EnhancedTelemetryMercury")) mercuryServices = append(mercuryServices, enhancedTelemService) } diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index 54a5002093d..2e1d733c1c2 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -4,9 +4,9 @@ import ( "context" "encoding/json" "fmt" + "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/smartcontractkit/libocr/commontypes" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "google.golang.org/protobuf/proto" @@ -14,10 +14,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" "github.com/smartcontractkit/chainlink/v2/core/utils" relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1" + relaymercuryv2 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v2" + relaymercuryv3 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v3" ) type eaTelemetry struct { @@ -35,9 +38,15 @@ type EnhancedTelemetryData struct { } type EnhancedTelemetryMercuryData struct { - TaskRunResults pipeline.TaskRunResults - Observation relaymercuryv1.Observation - RepTimestamp ocrtypes.ReportTimestamp + V1Observation *relaymercuryv1.Observation + V2Observation *relaymercuryv2.Observation + V3Observation *relaymercuryv3.Observation + TaskRunResults pipeline.TaskRunResults + RepTimestamp ocrtypes.ReportTimestamp + FeedVersion mercuryutils.FeedVersion + FetchMaxFinalizedTimestamp bool + IsLinkFeed bool + IsNativeFeed bool } type EnhancedTelemetryService[T EnhancedTelemetryData | EnhancedTelemetryMercuryData] struct { @@ -68,13 +77,13 @@ func (e *EnhancedTelemetryService[T]) Start(context.Context) error { for { select { case t := <-e.chTelem: - switch any(t).(type) { + switch v := any(t).(type) { case EnhancedTelemetryData: - s := any(t).(EnhancedTelemetryData) - e.collectEATelemetry(s.TaskRunResults, s.FinalResults, s.RepTimestamp) + e.collectEATelemetry(v.TaskRunResults, v.FinalResults, v.RepTimestamp) case EnhancedTelemetryMercuryData: - s := any(t).(EnhancedTelemetryMercuryData) - e.collectMercuryEnhancedTelemetry(s.Observation, s.TaskRunResults, s.RepTimestamp) + e.collectMercuryEnhancedTelemetry(v) + default: + e.lggr.Errorf("unrecognised telemetry data type: %T", t) } case <-e.chDone: return @@ -223,14 +232,19 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul continue } + if trr.Result.Error != nil { + e.lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, job %d, id %s", e.job.ID, trr.Task.DotID()), "err", trr.Result.Error) + continue + } bridgeRawResponse, ok := trr.Result.Value.(string) if !ok { - e.lggr.Warnf("cannot get bridge response from bridge task, job %d, id %s", e.job.ID, trr.Task.DotID()) + e.lggr.Warnf("cannot parse bridge response from bridge task, job %d, id %s: expected string, got: %v (type %T)", e.job.ID, trr.Task.DotID(), trr.Result.Value, trr.Result.Value) continue } eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse)) if err != nil { - e.lggr.Warnf("cannot parse EA telemetry, job %d, id %s", e.job.ID, trr.Task.DotID()) + e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job %d, id %s", e.job.ID, trr.Task.DotID()), "err", err) + continue } value := e.getParsedValue(trrs, trr) @@ -253,7 +267,7 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul bytes, err := proto.Marshal(t) if err != nil { - e.lggr.Warnf("protobuf marshal failed %v", err.Error()) + e.lggr.Warnw("protobuf marshal failed", "err", err) continue } @@ -263,14 +277,81 @@ func (e *EnhancedTelemetryService[T]) collectAndSend(trrs *pipeline.TaskRunResul // collectMercuryEnhancedTelemetry checks if enhanced telemetry should be collected, fetches the information needed and // sends the telemetry -func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(obs relaymercuryv1.Observation, trrs pipeline.TaskRunResults, repts ocrtypes.ReportTimestamp) { +func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d EnhancedTelemetryMercuryData) { if e.monitoringEndpoint == nil { return } - obsBenchmarkPrice, obsBid, obsAsk, obsBlockNum, obsBlockHash, obsBlockTimestamp := e.getFinalValues(obs) + // v1 fields + var bn int64 + var bh string + var bt uint64 + // v1+v2+v3 fields + bp := big.NewInt(0) + //v1+v3 fields + bid := big.NewInt(0) + ask := big.NewInt(0) + // v2+v3 fields + var mfts, lp, np int64 + + switch { + case d.V1Observation != nil: + obs := *d.V1Observation + if obs.CurrentBlockNum.Err == nil { + bn = obs.CurrentBlockNum.Val + } + if obs.CurrentBlockHash.Err == nil { + bh = common.BytesToHash(obs.CurrentBlockHash.Val).Hex() + } + if obs.CurrentBlockTimestamp.Err == nil { + bt = obs.CurrentBlockTimestamp.Val + } + if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil { + bp = obs.BenchmarkPrice.Val + } + if obs.Bid.Err == nil && obs.Bid.Val != nil { + bid = obs.Bid.Val + } + if obs.Ask.Err == nil && obs.Ask.Val != nil { + ask = obs.Ask.Val + } + case d.V2Observation != nil: + obs := *d.V2Observation + if obs.MaxFinalizedTimestamp.Err == nil { + mfts = obs.MaxFinalizedTimestamp.Val + } + if obs.LinkPrice.Err == nil && obs.LinkPrice.Val != nil { + lp = obs.LinkPrice.Val.Int64() + } + if obs.NativePrice.Err == nil && obs.NativePrice.Val != nil { + np = obs.NativePrice.Val.Int64() + } + if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil { + bp = obs.BenchmarkPrice.Val + } + case d.V3Observation != nil: + obs := *d.V3Observation + if obs.MaxFinalizedTimestamp.Err == nil { + mfts = obs.MaxFinalizedTimestamp.Val + } + if obs.LinkPrice.Err == nil && obs.LinkPrice.Val != nil { + lp = obs.LinkPrice.Val.Int64() + } + if obs.NativePrice.Err == nil && obs.NativePrice.Val != nil { + np = obs.NativePrice.Val.Int64() + } + if obs.BenchmarkPrice.Err == nil && obs.BenchmarkPrice.Val != nil { + bp = obs.BenchmarkPrice.Val + } + if obs.Bid.Err == nil && obs.Bid.Val != nil { + bid = obs.Bid.Val + } + if obs.Ask.Err == nil && obs.Ask.Val != nil { + ask = obs.Ask.Val + } + } - for _, trr := range trrs { + for _, trr := range d.TaskRunResults { if trr.Task.Type() != pipeline.TaskTypeBridge { continue } @@ -287,30 +368,41 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(obs relaym } assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData) - benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, &trrs) + + benchmarkPrice, bidPrice, askPrice := e.getPricesFromResults(trr, d.TaskRunResults, d.FeedVersion) t := &telem.EnhancedEAMercury{ - DataSource: eaTelem.DataSource, - DpBenchmarkPrice: benchmarkPrice, - DpBid: bidPrice, - DpAsk: askPrice, - CurrentBlockNumber: obsBlockNum, - CurrentBlockHash: common.BytesToHash(obsBlockHash).String(), - CurrentBlockTimestamp: obsBlockTimestamp, - BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(), - BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(), - ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp, - ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp, - ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished, - ProviderIndicatedTime: eaTelem.ProviderIndicatedTime, - Feed: e.job.OCR2OracleSpec.FeedID.Hex(), - ObservationBenchmarkPrice: obsBenchmarkPrice, - ObservationBid: obsBid, - ObservationAsk: obsAsk, - ConfigDigest: repts.ConfigDigest.Hex(), - Round: int64(repts.Round), - Epoch: int64(repts.Epoch), - AssetSymbol: assetSymbol, + DataSource: eaTelem.DataSource, + DpBenchmarkPrice: benchmarkPrice, + DpBid: bidPrice, + DpAsk: askPrice, + CurrentBlockNumber: bn, + CurrentBlockHash: bh, + CurrentBlockTimestamp: bt, + FetchMaxFinalizedTimestamp: d.FetchMaxFinalizedTimestamp, + MaxFinalizedTimestamp: mfts, + BridgeTaskRunStartedTimestamp: trr.CreatedAt.UnixMilli(), + BridgeTaskRunEndedTimestamp: trr.FinishedAt.Time.UnixMilli(), + ProviderRequestedTimestamp: eaTelem.ProviderRequestedTimestamp, + ProviderReceivedTimestamp: eaTelem.ProviderReceivedTimestamp, + ProviderDataStreamEstablished: eaTelem.ProviderDataStreamEstablished, + ProviderIndicatedTime: eaTelem.ProviderIndicatedTime, + Feed: e.job.OCR2OracleSpec.FeedID.Hex(), + ObservationBenchmarkPrice: bp.Int64(), + ObservationBid: bid.Int64(), + ObservationAsk: ask.Int64(), + ObservationBenchmarkPriceString: stringOrEmpty(bp), + ObservationBidString: stringOrEmpty(bid), + ObservationAskString: stringOrEmpty(ask), + IsLinkFeed: d.IsLinkFeed, + LinkPrice: lp, + IsNativeFeed: d.IsNativeFeed, + NativePrice: np, + ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(), + Round: int64(d.RepTimestamp.Round), + Epoch: int64(d.RepTimestamp.Epoch), + AssetSymbol: assetSymbol, + Version: uint32(d.FeedVersion), } bytes, err := proto.Marshal(t) @@ -343,19 +435,19 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData } // ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent -func ShouldCollectEnhancedTelemetryMercury(job *job.Job) bool { - if job.Type.String() == pipeline.OffchainReporting2JobType && job.OCR2OracleSpec != nil { - return job.OCR2OracleSpec.CaptureEATelemetry +func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool { + if jb.Type.String() == pipeline.OffchainReporting2JobType && jb.OCR2OracleSpec != nil { + return jb.OCR2OracleSpec.CaptureEATelemetry } return false } // getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice, // bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered -func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks *pipeline.TaskRunResults) (float64, float64, float64) { +func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) { var benchmarkPrice, askPrice, bidPrice float64 var err error - //We rely on task results to be sorted in the correct order + // We rely on task results to be sorted in the correct order benchmarkPriceTask := allTasks.GetNextTaskOf(startTask) if benchmarkPriceTask == nil { e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, task is nil, job %d, id %s", e.job.ID) @@ -372,12 +464,18 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta } } + // mercury version 2 only supports benchmarkPrice + if mercuryVersion == 2 { + return benchmarkPrice, 0, 0 + } + bidTask := allTasks.GetNextTaskOf(*benchmarkPriceTask) if bidTask == nil { e.lggr.Warnf("cannot parse enhanced EA telemetry bid price, task is nil, job %d, id %s", e.job.ID) return benchmarkPrice, 0, 0 } - if bidTask.Task.Type() == pipeline.TaskTypeJSONParse { + + if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse { if bidTask.Result.Error != nil { e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error) } else { @@ -393,7 +491,7 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID) return benchmarkPrice, bidPrice, 0 } - if askTask.Task.Type() == pipeline.TaskTypeJSONParse { + if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse { if bidTask.Result.Error != nil { e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error) } else { @@ -407,21 +505,11 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta return benchmarkPrice, bidPrice, askPrice } -// getFinalValues runs a parse on the pipeline.TaskRunResults and returns the values -func (e *EnhancedTelemetryService[T]) getFinalValues(obs relaymercuryv1.Observation) (int64, int64, int64, int64, []byte, uint64) { - var benchmarkPrice, bid, ask int64 - - if obs.BenchmarkPrice.Val != nil { - benchmarkPrice = obs.BenchmarkPrice.Val.Int64() - } - if obs.Bid.Val != nil { - bid = obs.Bid.Val.Int64() - } - if obs.Ask.Val != nil { - ask = obs.Ask.Val.Int64() +// MaybeEnqueueEnhancedTelem sends data to the telemetry channel for processing +func MaybeEnqueueEnhancedTelem(jb job.Job, ch chan<- EnhancedTelemetryMercuryData, data EnhancedTelemetryMercuryData) { + if ShouldCollectEnhancedTelemetryMercury(jb) { + EnqueueEnhancedTelem[EnhancedTelemetryMercuryData](ch, data) } - - return benchmarkPrice, bid, ask, obs.CurrentBlockNum.Val, obs.CurrentBlockHash.Val, obs.CurrentBlockTimestamp.Val } // EnqueueEnhancedTelem sends data to the telemetry channel for processing @@ -441,3 +529,10 @@ func getResultFloat64(task *pipeline.TaskRunResult) (float64, error) { resultFloat64, _ := result.Float64() return resultFloat64, nil } + +func stringOrEmpty(n *big.Int) string { + if n.Cmp(big.NewInt(0)) == 0 { + return "" + } + return n.String() +} diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index 495dc6fe788..413736b91f8 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury" mercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1" + mercury_v2 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v2" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -361,7 +362,7 @@ func TestCollectAndSend(t *testing.T) { wg.Wait() assert.Equal(t, logs.Len(), 2) - assert.Contains(t, logs.All()[0].Message, "cannot get bridge response from bridge task") + assert.Contains(t, logs.All()[0].Message, "cannot parse bridge response from bridge task") badTrrs = &pipeline.TaskRunResults{ pipeline.TaskRunResult{ @@ -372,20 +373,19 @@ func TestCollectAndSend(t *testing.T) { Value: "[]", }, }} - wg.Add(1) enhancedTelemChan <- EnhancedTelemetryData{ TaskRunResults: *badTrrs, FinalResults: *finalResult, RepTimestamp: observationTimestamp, } wg.Wait() - assert.Equal(t, logs.Len(), 4) - assert.Contains(t, logs.All()[2].Message, "cannot parse EA telemetry") - assert.Contains(t, logs.All()[3].Message, "cannot get json parse value") + assert.Equal(t, 2, logs.Len()) + assert.Contains(t, logs.All()[0].Message, "cannot parse bridge response from bridge task") + assert.Contains(t, logs.All()[1].Message, "cannot get json parse value") doneCh <- struct{}{} } -var trrsMercury = pipeline.TaskRunResults{ +var trrsMercuryV1 = pipeline.TaskRunResults{ pipeline.TaskRunResult{ Task: &pipeline.BridgeTask{ BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0), @@ -421,32 +421,24 @@ var trrsMercury = pipeline.TaskRunResults{ }, } -func TestGetFinalValues(t *testing.T) { - e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{} - o := mercuryv1.Observation{ - BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)}, - Bid: mercury.ObsResult[*big.Int]{Val: big.NewInt(222222)}, - Ask: mercury.ObsResult[*big.Int]{Val: big.NewInt(333333)}, - CurrentBlockNum: mercury.ObsResult[int64]{Val: 123456789}, - CurrentBlockHash: mercury.ObsResult[[]byte]{Val: common.HexToHash("0x123321").Bytes()}, - CurrentBlockTimestamp: mercury.ObsResult[uint64]{Val: 987654321}, - } - - benchmarkPrice, bid, ask, blockNr, blockHash, blockTimestamp := e.getFinalValues(o) - require.Equal(t, benchmarkPrice, int64(111111)) - require.Equal(t, bid, int64(222222)) - require.Equal(t, ask, int64(333333)) - require.Equal(t, blockNr, int64(123456789)) - require.Equal(t, blockHash, common.HexToHash("0x123321").Bytes()) - require.Equal(t, blockTimestamp, uint64(987654321)) - - benchmarkPrice, bid, ask, blockNr, blockHash, blockTimestamp = e.getFinalValues(mercuryv1.Observation{}) - require.Equal(t, benchmarkPrice, int64(0)) - require.Equal(t, bid, int64(0)) - require.Equal(t, ask, int64(0)) - require.Equal(t, blockNr, int64(0)) - require.Nil(t, blockHash) - require.Equal(t, blockTimestamp, uint64(0)) +var trrsMercuryV2 = pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &pipeline.BridgeTask{ + BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0), + RequestData: `{"data":{"to":"LINK","from":"USD"}}`, + }, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds1_benchmark", nil, nil, 1), + }, + Result: pipeline.Result{ + Value: float64(123456.123456), + }, + }, } func TestGetPricesFromResults(t *testing.T) { @@ -458,25 +450,25 @@ func TestGetPricesFromResults(t *testing.T) { }, } - benchmarkPrice, bid, ask := e.getPricesFromResults(trrsMercury[0], &trrsMercury) + benchmarkPrice, bid, ask := e.getPricesFromResults(trrsMercuryV1[0], trrsMercuryV1, 1) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, 1234567.1234567, bid) require.Equal(t, float64(321123), ask) - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercury[0], &pipeline.TaskRunResults{}) + benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], pipeline.TaskRunResults{}, 1) require.Equal(t, float64(0), benchmarkPrice) require.Equal(t, float64(0), bid) require.Equal(t, float64(0), ask) require.Equal(t, 1, logs.Len()) require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry") - tt := trrsMercury[:2] - e.getPricesFromResults(trrsMercury[0], &tt) + tt := trrsMercuryV1[:2] + e.getPricesFromResults(trrsMercuryV1[0], tt, 1) require.Equal(t, 2, logs.Len()) require.Contains(t, logs.All()[1].Message, "cannot parse enhanced EA telemetry bid price, task is nil") - tt = trrsMercury[:3] - e.getPricesFromResults(trrsMercury[0], &tt) + tt = trrsMercuryV1[:3] + e.getPricesFromResults(trrsMercuryV1[0], tt, 1) require.Equal(t, 3, logs.Len()) require.Contains(t, logs.All()[2].Message, "cannot parse enhanced EA telemetry ask price, task is nil") @@ -513,7 +505,7 @@ func TestGetPricesFromResults(t *testing.T) { Value: nil, }, }} - benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercury[0], &trrs2) + benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], trrs2, 3) require.Equal(t, benchmarkPrice, float64(0)) require.Equal(t, bid, float64(0)) require.Equal(t, ask, float64(0)) @@ -521,11 +513,16 @@ func TestGetPricesFromResults(t *testing.T) { require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry benchmark price") require.Contains(t, logs.All()[4].Message, "cannot parse enhanced EA telemetry bid price") require.Contains(t, logs.All()[5].Message, "cannot parse enhanced EA telemetry ask price") + + benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercuryV1[0], trrsMercuryV2, 2) + require.Equal(t, 123456.123456, benchmarkPrice) + require.Equal(t, float64(0), bid) + require.Equal(t, float64(0), ask) } func TestShouldCollectEnhancedTelemetryMercury(t *testing.T) { - j := &job.Job{ + j := job.Job{ Type: job.Type(pipeline.OffchainReporting2JobType), OCR2OracleSpec: &job.OCR2OracleSpec{ CaptureEATelemetry: true, @@ -547,7 +544,7 @@ func TestGetAssetSymbolFromRequestData(t *testing.T) { require.Equal(t, e.getAssetSymbolFromRequestData(reqData), "USD/LINK") } -func TestCollectMercuryEnhancedTelemetry(t *testing.T) { +func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { wg := sync.WaitGroup{} ingressClient := mocks.NewTelemetryIngressClient(t) ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) @@ -581,8 +578,8 @@ func TestCollectMercuryEnhancedTelemetry(t *testing.T) { wg.Add(1) chTelem <- EnhancedTelemetryMercuryData{ - TaskRunResults: trrsMercury, - Observation: mercuryv1.Observation{ + TaskRunResults: trrsMercuryV1, + V1Observation: &mercuryv1.Observation{ BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)}, Bid: mercury.ObsResult[*big.Int]{Val: big.NewInt(222222)}, Ask: mercury.ObsResult[*big.Int]{Val: big.NewInt(333333)}, @@ -598,27 +595,30 @@ func TestCollectMercuryEnhancedTelemetry(t *testing.T) { } expectedTelemetry := telem.EnhancedEAMercury{ - DataSource: "data-source-name", - DpBenchmarkPrice: 123456.123456, - DpBid: 1234567.1234567, - DpAsk: 321123, - CurrentBlockNumber: 123456789, - CurrentBlockHash: common.HexToHash("0x123321").String(), - CurrentBlockTimestamp: 987654321, - BridgeTaskRunStartedTimestamp: trrsMercury[0].CreatedAt.UnixMilli(), - BridgeTaskRunEndedTimestamp: trrsMercury[0].FinishedAt.Time.UnixMilli(), - ProviderRequestedTimestamp: 92233720368547760, - ProviderReceivedTimestamp: -92233720368547760, - ProviderDataStreamEstablished: 1, - ProviderIndicatedTime: -123456789, - Feed: common.HexToHash("0x111").String(), - ObservationBenchmarkPrice: 111111, - ObservationBid: 222222, - ObservationAsk: 333333, - ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", - Round: 22, - Epoch: 11, - AssetSymbol: "USD/LINK", + DataSource: "data-source-name", + DpBenchmarkPrice: 123456.123456, + DpBid: 1234567.1234567, + DpAsk: 321123, + CurrentBlockNumber: 123456789, + CurrentBlockHash: common.HexToHash("0x123321").String(), + CurrentBlockTimestamp: 987654321, + BridgeTaskRunStartedTimestamp: trrsMercuryV1[0].CreatedAt.UnixMilli(), + BridgeTaskRunEndedTimestamp: trrsMercuryV1[0].FinishedAt.Time.UnixMilli(), + ProviderRequestedTimestamp: 92233720368547760, + ProviderReceivedTimestamp: -92233720368547760, + ProviderDataStreamEstablished: 1, + ProviderIndicatedTime: -123456789, + Feed: common.HexToHash("0x111").String(), + ObservationBenchmarkPrice: 111111, + ObservationBid: 222222, + ObservationAsk: 333333, + ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", + Round: 22, + Epoch: 11, + AssetSymbol: "USD/LINK", + ObservationBenchmarkPriceString: "111111", + ObservationBidString: "222222", + ObservationAskString: "333333", } expectedMessage, _ := proto.Marshal(&expectedTelemetry) @@ -634,7 +634,7 @@ func TestCollectMercuryEnhancedTelemetry(t *testing.T) { Value: nil, }}, }, - Observation: mercuryv1.Observation{}, + V1Observation: &mercuryv1.Observation{}, RepTimestamp: types.ReportTimestamp{ ConfigDigest: types.ConfigDigest{2}, Epoch: 11, @@ -642,10 +642,10 @@ func TestCollectMercuryEnhancedTelemetry(t *testing.T) { }, } wg.Add(1) - trrsMercury[0].Result.Value = "" + trrsMercuryV1[0].Result.Value = "" chTelem <- EnhancedTelemetryMercuryData{ - TaskRunResults: trrsMercury, - Observation: mercuryv1.Observation{}, + TaskRunResults: trrsMercuryV1, + V1Observation: &mercuryv1.Observation{}, RepTimestamp: types.ReportTimestamp{ ConfigDigest: types.ConfigDigest{2}, Epoch: 11, @@ -659,3 +659,119 @@ func TestCollectMercuryEnhancedTelemetry(t *testing.T) { require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry") chDone <- struct{}{} } + +func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) { + wg := sync.WaitGroup{} + ingressClient := mocks.NewTelemetryService(t) + ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.EnhancedEAMercury, "test-network", "test-chainID") + + var sentMessage []byte + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessage = args[1].([]byte) + wg.Done() + }) + + lggr, logs := logger.TestLoggerObserved(t, zap.WarnLevel) + chTelem := make(chan EnhancedTelemetryMercuryData, 100) + chDone := make(chan struct{}) + feedID := common.HexToHash("0x111") + e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ + chDone: chDone, + chTelem: chTelem, + job: &job.Job{ + Type: job.Type(pipeline.OffchainReporting2JobType), + OCR2OracleSpec: &job.OCR2OracleSpec{ + CaptureEATelemetry: true, + FeedID: &feedID, + }, + }, + lggr: lggr, + monitoringEndpoint: monitoringEndpoint, + } + require.NoError(t, e.Start(testutils.Context(t))) + + wg.Add(1) + + chTelem <- EnhancedTelemetryMercuryData{ + TaskRunResults: trrsMercuryV2, + V2Observation: &mercury_v2.Observation{ + BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)}, + MaxFinalizedTimestamp: mercury.ObsResult[int64]{Val: 321}, + LinkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(4321)}, + NativePrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(54321)}, + }, + RepTimestamp: types.ReportTimestamp{ + ConfigDigest: types.ConfigDigest{2}, + Epoch: 11, + Round: 22, + }, + } + + expectedTelemetry := telem.EnhancedEAMercury{ + DataSource: "data-source-name", + DpBenchmarkPrice: 123456.123456, + CurrentBlockNumber: 0, + CurrentBlockHash: "", + CurrentBlockTimestamp: 0, + BridgeTaskRunStartedTimestamp: trrsMercuryV1[0].CreatedAt.UnixMilli(), + BridgeTaskRunEndedTimestamp: trrsMercuryV1[0].FinishedAt.Time.UnixMilli(), + ProviderRequestedTimestamp: 92233720368547760, + ProviderReceivedTimestamp: -92233720368547760, + ProviderDataStreamEstablished: 1, + ProviderIndicatedTime: -123456789, + Feed: common.HexToHash("0x111").String(), + ObservationBenchmarkPrice: 111111, + ObservationBid: 0, + ObservationAsk: 0, + ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", + Round: 22, + Epoch: 11, + AssetSymbol: "USD/LINK", + ObservationBenchmarkPriceString: "111111", + MaxFinalizedTimestamp: 321, + LinkPrice: 4321, + NativePrice: 54321, + } + + expectedMessage, _ := proto.Marshal(&expectedTelemetry) + wg.Wait() + + require.Equal(t, expectedMessage, sentMessage) + + chTelem <- EnhancedTelemetryMercuryData{ + TaskRunResults: pipeline.TaskRunResults{ + pipeline.TaskRunResult{Task: &pipeline.BridgeTask{ + BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0), + }, + Result: pipeline.Result{ + Value: nil, + }}, + }, + V2Observation: &mercury_v2.Observation{}, + RepTimestamp: types.ReportTimestamp{ + ConfigDigest: types.ConfigDigest{2}, + Epoch: 11, + Round: 22, + }, + } + wg.Add(1) + trrsMercuryV2[0].Result.Value = "" + chTelem <- EnhancedTelemetryMercuryData{ + TaskRunResults: trrsMercuryV2, + V2Observation: &mercury_v2.Observation{}, + RepTimestamp: types.ReportTimestamp{ + ConfigDigest: types.ConfigDigest{2}, + Epoch: 11, + Round: 22, + }, + } + + wg.Wait() + require.Equal(t, 4, logs.Len()) + require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry bid price") + require.Contains(t, logs.All()[1].Message, "cannot get bridge response from bridge task") + require.Contains(t, logs.All()[2].Message, "cannot parse EA telemetry") + require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry bid price") + chDone <- struct{}{} +} diff --git a/core/services/relay/evm/mercury/orm.go b/core/services/relay/evm/mercury/orm.go index dd7d7b33e74..7273519f6b6 100644 --- a/core/services/relay/evm/mercury/orm.go +++ b/core/services/relay/evm/mercury/orm.go @@ -23,8 +23,8 @@ import ( type ORM interface { InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOpt) error - GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) - PruneTransmitRequests(maxSize int, qopts ...pg.QOpt) error + GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) + PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) } @@ -49,6 +49,11 @@ func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) ORM { // InsertTransmitRequest inserts one transmit request if the payload does not exist already. func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext, qopts ...pg.QOpt) error { + feedID, err := FeedIDFromReport(req.Payload) + if err != nil { + return err + } + q := o.q.WithOpts(qopts...) var wg sync.WaitGroup wg.Add(2) @@ -57,16 +62,12 @@ func (o *orm) InsertTransmitRequest(req *pb.TransmitRequest, jobID int32, report go func() { defer wg.Done() err1 = q.ExecQ(` - INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (payload_hash) DO NOTHING - `, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID) + `, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:]) }() - feedID, err := FeedIDFromReport(req.Payload) - if err != nil { - return err - } go func() { defer wg.Done() err2 = q.ExecQ(` @@ -101,15 +102,16 @@ func (o *orm) DeleteTransmitRequests(reqs []*pb.TransmitRequest, qopts ...pg.QOp } // GetTransmitRequests returns all transmit requests in chronologically descending order. -func (o *orm) GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) { +func (o *orm) GetTransmitRequests(jobID int32, qopts ...pg.QOpt) ([]*Transmission, error) { q := o.q.WithOpts(qopts...) // The priority queue uses epoch and round to sort transmissions so order by // the same fields here for optimal insertion into the pq. rows, err := q.QueryContext(q.ParentCtx, ` SELECT payload, config_digest, epoch, round, extra_hash FROM mercury_transmit_requests + WHERE job_id = $1 ORDER BY epoch DESC, round DESC - `) + `, jobID) if err != nil { return nil, err } @@ -142,20 +144,22 @@ func (o *orm) GetTransmitRequests(qopts ...pg.QOpt) ([]*Transmission, error) { return transmissions, nil } -// PruneTransmitRequests keeps at most maxSize rows in the table, deleting the -// oldest transactions. -func (o *orm) PruneTransmitRequests(maxSize int, qopts ...pg.QOpt) error { +// PruneTransmitRequests keeps at most maxSize rows for the given job ID, +// deleting the oldest transactions. +func (o *orm) PruneTransmitRequests(jobID int32, maxSize int, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) // Prune the oldest requests by epoch and round. return q.ExecQ(` DELETE FROM mercury_transmit_requests - WHERE payload_hash NOT IN ( + WHERE job_id = $1 AND + payload_hash NOT IN ( SELECT payload_hash FROM mercury_transmit_requests + WHERE job_id = $1 ORDER BY epoch DESC, round DESC - LIMIT $1 + LIMIT $2 ) - `, maxSize) + `, jobID, maxSize) } func (o *orm) LatestReport(ctx context.Context, feedID [32]byte, qopts ...pg.QOpt) (report []byte, err error) { diff --git a/core/services/relay/evm/mercury/orm_test.go b/core/services/relay/evm/mercury/orm_test.go index a6a72327677..56dea70417b 100644 --- a/core/services/relay/evm/mercury/orm_test.go +++ b/core/services/relay/evm/mercury/orm_test.go @@ -3,6 +3,7 @@ package mercury import ( "testing" + "github.com/cometbft/cometbft/libs/rand" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,7 +17,7 @@ import ( func TestORM(t *testing.T) { db := pgtest.NewSqlxDB(t) - var jobID int32 // foreign key constraints disabled so can leave as 0 + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) lggr := logger.TestLogger(t) @@ -48,7 +49,7 @@ func TestORM(t *testing.T) { err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) require.NoError(t, err) - transmissions, err := orm.GetTransmitRequests() + transmissions, err := orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -65,7 +66,7 @@ func TestORM(t *testing.T) { err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: reports[1]}}) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -80,7 +81,7 @@ func TestORM(t *testing.T) { err = orm.DeleteTransmitRequests([]*pb.TransmitRequest{{Payload: []byte("does-not-exist")}}) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: reportContexts[2]}, @@ -98,7 +99,7 @@ func TestORM(t *testing.T) { require.NoError(t, err) assert.Equal(t, reports[2], l) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Empty(t, transmissions) @@ -106,7 +107,7 @@ func TestORM(t *testing.T) { err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]}, @@ -118,7 +119,7 @@ func TestORM(t *testing.T) { err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: reportContexts[3]}, @@ -131,7 +132,7 @@ func TestORM(t *testing.T) { func TestORM_PruneTransmitRequests(t *testing.T) { db := pgtest.NewSqlxDB(t) - var jobID int32 // foreign key constraints disabled so can leave as 0 + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) @@ -157,10 +158,10 @@ func TestORM_PruneTransmitRequests(t *testing.T) { require.NoError(t, err) // Max size greater than table size, expect no-op - err = orm.PruneTransmitRequests(5) + err = orm.PruneTransmitRequests(jobID, 5) require.NoError(t, err) - transmissions, err := orm.GetTransmitRequests() + transmissions, err := orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, @@ -168,37 +169,48 @@ func TestORM_PruneTransmitRequests(t *testing.T) { }) // Max size equal to table size, expect no-op - err = orm.PruneTransmitRequests(2) + err = orm.PruneTransmitRequests(jobID, 2) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) require.Equal(t, transmissions, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }) + // Max size is table size + 1, but jobID differs, expect no-op + err = orm.PruneTransmitRequests(-1, 2) + require.NoError(t, err) + + transmissions, err = orm.GetTransmitRequests(jobID) + require.NoError(t, err) + require.Equal(t, []*Transmission{ + {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, + {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, + }, transmissions) + err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) err = orm.InsertTransmitRequest(&pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) require.NoError(t, err) - // Max size is table size + 1, expect the oldest row to be pruned. - err = orm.PruneTransmitRequests(3) + // Max size is table size - 1, expect the oldest row to be pruned. + err = orm.PruneTransmitRequests(jobID, 3) require.NoError(t, err) - transmissions, err = orm.GetTransmitRequests() + transmissions, err = orm.GetTransmitRequests(jobID) require.NoError(t, err) - require.Equal(t, transmissions, []*Transmission{ + require.Equal(t, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: makeReportContext(2, 2)}, {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: makeReportContext(2, 1)}, {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: makeReportContext(1, 2)}, - }) + }, transmissions) } func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { db := pgtest.NewSqlxDB(t) - var jobID int32 // foreign key constraints disabled so can leave as 0 + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index 9e8df72a155..1c8dad45301 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -78,7 +78,7 @@ func (pm *PersistenceManager) AsyncDelete(req *pb.TransmitRequest) { } func (pm *PersistenceManager) Load(ctx context.Context) ([]*Transmission, error) { - return pm.orm.GetTransmitRequests(pg.WithParentCtx(ctx)) + return pm.orm.GetTransmitRequests(pm.jobID, pg.WithParentCtx(ctx)) } func (pm *PersistenceManager) runFlushDeletesLoop() { @@ -118,7 +118,7 @@ func (pm *PersistenceManager) runPruneLoop() { ticker.Stop() return case <-ticker.C: - if err := pm.orm.PruneTransmitRequests(pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { + if err := pm.orm.PruneTransmitRequests(pm.jobID, pm.maxTransmitQueueSize, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout()); err != nil { pm.lggr.Errorw("Failed to prune transmit requests table", "err", err) } else { pm.lggr.Debugw("Pruned transmit requests table") diff --git a/core/services/relay/evm/mercury/persistence_manager_test.go b/core/services/relay/evm/mercury/persistence_manager_test.go index 97628ed9c2b..d185a64a8f1 100644 --- a/core/services/relay/evm/mercury/persistence_manager_test.go +++ b/core/services/relay/evm/mercury/persistence_manager_test.go @@ -5,7 +5,10 @@ import ( "testing" "time" + "github.com/cometbft/cometbft/libs/rand" ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/sqlx" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -16,19 +19,22 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" ) -func bootstrapPersistenceManager(t *testing.T) (*PersistenceManager, *observer.ObservedLogs) { +func bootstrapPersistenceManager(t *testing.T, jobID int32, db *sqlx.DB) (*PersistenceManager, *observer.ObservedLogs) { t.Helper() - db := pgtest.NewSqlxDB(t) - pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) - pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) orm := NewORM(db, lggr, pgtest.NewQConfig(true)) - return NewPersistenceManager(lggr, orm, 0, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs + return NewPersistenceManager(lggr, orm, jobID, 2, 5*time.Millisecond, 5*time.Millisecond), observedLogs } func TestPersistenceManager(t *testing.T) { + jobID1 := rand.Int32() + jobID2 := jobID1 + 1 + ctx := context.Background() - pm, _ := bootstrapPersistenceManager(t) + db := pgtest.NewSqlxDB(t) + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + pm, _ := bootstrapPersistenceManager(t, jobID1, db) reports := sampleReports @@ -52,11 +58,23 @@ func TestPersistenceManager(t *testing.T) { require.Equal(t, []*Transmission{ {Req: &pb.TransmitRequest{Payload: reports[1]}}, }, transmissions) + + t.Run("scopes load to only transmissions with matching job ID", func(t *testing.T) { + pm2, _ := bootstrapPersistenceManager(t, jobID2, db) + transmissions, err = pm2.Load(ctx) + require.NoError(t, err) + + assert.Len(t, transmissions, 0) + }) } func TestPersistenceManagerAsyncDelete(t *testing.T) { ctx := context.Background() - pm, observedLogs := bootstrapPersistenceManager(t) + jobID := rand.Int32() + db := pgtest.NewSqlxDB(t) + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + pm, observedLogs := bootstrapPersistenceManager(t, jobID, db) reports := sampleReports @@ -96,16 +114,32 @@ func TestPersistenceManagerAsyncDelete(t *testing.T) { } func TestPersistenceManagerPrune(t *testing.T) { + jobID1 := rand.Int32() + jobID2 := jobID1 + 1 + db := pgtest.NewSqlxDB(t) + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + ctx := context.Background() - pm, observedLogs := bootstrapPersistenceManager(t) - reports := sampleReports + reports := make([][]byte, 25) + for i := 0; i < 25; i++ { + reports[i] = buildSampleV1Report(int64(i)) + } - err := pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[0]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 1}}) + pm2, _ := bootstrapPersistenceManager(t, jobID2, db) + for i := 0; i < 20; i++ { + err := pm2.Insert(ctx, &pb.TransmitRequest{Payload: reports[i]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: uint32(i)}}) + require.NoError(t, err) + } + + pm, observedLogs := bootstrapPersistenceManager(t, jobID1, db) + + err := pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[21]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 21}}) require.NoError(t, err) - err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[1]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}) + err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[22]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}) require.NoError(t, err) - err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[2]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}) + err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[23]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}) require.NoError(t, err) err = pm.Start(ctx) @@ -118,24 +152,28 @@ func TestPersistenceManagerPrune(t *testing.T) { transmissions, err := pm.Load(ctx) require.NoError(t, err) require.Equal(t, []*Transmission{ - {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}}, - {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}}, + {Req: &pb.TransmitRequest{Payload: reports[23]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}}, + {Req: &pb.TransmitRequest{Payload: reports[22]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}}, }, transmissions) // Test pruning stops after Close. err = pm.Close() require.NoError(t, err) - err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[3]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 4}}) + err = pm.Insert(ctx, &pb.TransmitRequest{Payload: reports[24]}, ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 24}}) require.NoError(t, err) - time.Sleep(15 * time.Millisecond) - transmissions, err = pm.Load(ctx) require.NoError(t, err) require.Equal(t, []*Transmission{ - {Req: &pb.TransmitRequest{Payload: reports[3]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 4}}}, - {Req: &pb.TransmitRequest{Payload: reports[2]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 3}}}, - {Req: &pb.TransmitRequest{Payload: reports[1]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 2}}}, + {Req: &pb.TransmitRequest{Payload: reports[24]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 24}}}, + {Req: &pb.TransmitRequest{Payload: reports[23]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 23}}}, + {Req: &pb.TransmitRequest{Payload: reports[22]}, ReportCtx: ocrtypes.ReportContext{ReportTimestamp: ocrtypes.ReportTimestamp{Epoch: 22}}}, }, transmissions) + + t.Run("prune was scoped to job ID", func(t *testing.T) { + transmissions, err = pm2.Load(ctx) + require.NoError(t, err) + assert.Len(t, transmissions, 20) + }) } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 199dbfcdf88..3d7f7542275 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -32,6 +32,7 @@ import ( var ( maxTransmitQueueSize = 10_000 + maxDeleteQueueSize = 10_000 transmitTimeout = 5 * time.Second ) @@ -59,6 +60,24 @@ var ( }, []string{"feedID"}, ) + transmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_transmit_queue_delete_error_count", + Help: "Running count of DB errors when trying to delete an item from the queue DB", + }, + []string{"feedID"}, + ) + transmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_transmit_queue_insert_error_count", + Help: "Running count of DB errors when trying to insert an item into the queue DB", + }, + []string{"feedID"}, + ) + transmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_transmit_queue_push_error_count", + Help: "Running count of DB errors when trying to push an item onto the queue", + }, + []string{"feedID"}, + ) transmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_server_error_count", Help: "Number of errored transmissions that failed due to an error returned by the mercury server", @@ -98,9 +117,14 @@ type mercuryTransmitter struct { queue *TransmitQueue wg sync.WaitGroup - transmitSuccessCount prometheus.Counter - transmitDuplicateCount prometheus.Counter - transmitConnectionErrorCount prometheus.Counter + deleteQueue chan *pb.TransmitRequest + + transmitSuccessCount prometheus.Counter + transmitDuplicateCount prometheus.Counter + transmitConnectionErrorCount prometheus.Counter + transmitQueueDeleteErrorCount prometheus.Counter + transmitQueueInsertErrorCount prometheus.Counter + transmitQueuePushErrorCount prometheus.Counter } var PayloadTypes = getPayloadTypes() @@ -136,11 +160,15 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp jobID, fmt.Sprintf("%x", fromAccount), make(chan (struct{})), - NewTransmitQueue(lggr, feedIDHex, maxTransmitQueueSize, nil, persistenceManager), + nil, sync.WaitGroup{}, + make(chan *pb.TransmitRequest, maxDeleteQueueSize), transmitSuccessCount.WithLabelValues(feedIDHex), transmitDuplicateCount.WithLabelValues(feedIDHex), transmitConnectionErrorCount.WithLabelValues(feedIDHex), + transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex), + transmitQueueInsertErrorCount.WithLabelValues(feedIDHex), + transmitQueuePushErrorCount.WithLabelValues(feedIDHex), } } @@ -163,6 +191,8 @@ func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) { return err } mt.wg.Add(1) + go mt.runDeleteQueueLoop() + mt.wg.Add(1) go mt.runQueueLoop() return nil }) @@ -193,6 +223,46 @@ func (mt *mercuryTransmitter) HealthReport() map[string]error { return report } +func (mt *mercuryTransmitter) runDeleteQueueLoop() { + defer mt.wg.Done() + runloopCtx, cancel := mt.stopCh.Ctx(context.Background()) + defer cancel() + + // Exponential backoff for very rarely occurring errors (DB disconnect etc) + b := backoff.Backoff{ + Min: 1 * time.Second, + Max: 120 * time.Second, + Factor: 2, + Jitter: true, + } + + for { + select { + case req := <-mt.deleteQueue: + for { + if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil { + mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "req", req) + mt.transmitQueueDeleteErrorCount.Inc() + select { + case <-time.After(b.Duration()): + // Wait a backoff duration before trying to delete again + continue + case <-mt.stopCh: + // abort and return immediately on stop even if items remain in queue + return + } + } + break + } + // success + b.Reset() + case <-mt.stopCh: + // abort and return immediately on stop even if items remain in queue + return + } + } +} + func (mt *mercuryTransmitter) runQueueLoop() { defer mt.wg.Done() // Exponential backoff with very short retry interval (since latency is a priority) @@ -254,9 +324,10 @@ func (mt *mercuryTransmitter) runQueueLoop() { } } - if err := mt.persistenceManager.Delete(runloopCtx, t.Req); err != nil { - mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "reportCtx", t.ReportCtx) - return + select { + case mt.deleteQueue <- t.Req: + default: + mt.lggr.Criticalw("Delete queue is full", "reportCtx", t.ReportCtx) } } } @@ -289,9 +360,11 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R mt.lggr.Tracew("Transmit enqueue", "req", req, "report", report, "reportCtx", reportCtx, "signatures", signatures) if err := mt.persistenceManager.Insert(ctx, req, reportCtx); err != nil { + mt.transmitQueueInsertErrorCount.Inc() return err } if ok := mt.queue.Push(req, reportCtx); !ok { + mt.transmitQueuePushErrorCount.Inc() return errors.New("transmit queue is closed") } return nil diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index 6723ffcbcac..c8a68d41a16 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -26,6 +26,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { var jobID int32 pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + q := NewTransmitQueue(lggr, "", 0, nil, nil) t.Run("v1 report transmission successfully enqueued", func(t *testing.T) { report := sampleV1Report @@ -40,6 +41,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { }, } mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) @@ -57,6 +59,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { }, } mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) @@ -74,6 +77,7 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { }, } mt := NewTransmitter(lggr, nil, c, sampleClientPubKey, jobID, sampleFeedID, db, pgtest.NewQConfig(true), nil) + mt.queue = q err := mt.Transmit(testutils.Context(t), sampleReportContext, report, sampleSigs) require.NoError(t, err) diff --git a/core/services/relay/evm/mercury/v1/data_source.go b/core/services/relay/evm/mercury/v1/data_source.go index 5c1f55ddab7..d225dbee68e 100644 --- a/core/services/relay/evm/mercury/v1/data_source.go +++ b/core/services/relay/evm/mercury/v1/data_source.go @@ -13,14 +13,14 @@ import ( relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury" relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1" - "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" + mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1/reportcodec" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -59,7 +59,7 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber} } -func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, err error) { +func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, pipelineExecutionErr error) { // setCurrentBlock must come first, along with observationTimestamp, to // avoid front-running ds.setCurrentBlock(ctx, &obs) @@ -116,9 +116,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam go func() { defer wg.Done() var run *pipeline.Run - run, trrs, err = ds.executeRun(ctx) - if err != nil { - err = fmt.Errorf("Observe failed while executing run: %w", err) + run, trrs, pipelineExecutionErr = ds.executeRun(ctx) + if pipelineExecutionErr != nil { + pipelineExecutionErr = fmt.Errorf("Observe failed while executing run: %w", pipelineExecutionErr) return } select { @@ -137,27 +137,30 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam } var parsed parseOutput - parsed, err = ds.parse(finaltrrs) - if err != nil { - err = fmt.Errorf("Observe failed while parsing run results: %w", err) + parsed, pipelineExecutionErr = ds.parse(finaltrrs) + if pipelineExecutionErr != nil { + pipelineExecutionErr = fmt.Errorf("Observe failed while parsing run results: %w", pipelineExecutionErr) return } obs.BenchmarkPrice = parsed.benchmarkPrice obs.Bid = parsed.bid obs.Ask = parsed.ask }() - wg.Wait() - if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&ds.jb) { - ocrcommon.EnqueueEnhancedTelem(ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{ - TaskRunResults: trrs, - Observation: obs, - RepTimestamp: repts, - }) + wg.Wait() + if pipelineExecutionErr != nil { + return } - return obs, err + ocrcommon.MaybeEnqueueEnhancedTelem(ds.jb, ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{ + V1Observation: &obs, + TaskRunResults: trrs, + RepTimestamp: repts, + FeedVersion: mercuryutils.REPORT_V1, + }) + + return obs, nil } func toBigInt(val interface{}) (*big.Int, error) { diff --git a/core/services/relay/evm/mercury/v2/data_source.go b/core/services/relay/evm/mercury/v2/data_source.go index caeae8d278a..ecb13ca362c 100644 --- a/core/services/relay/evm/mercury/v2/data_source.go +++ b/core/services/relay/evm/mercury/v2/data_source.go @@ -58,7 +58,7 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec return &datasource{pr, jb, spec, feedID, lggr, rr, orm, reportcodec.ReportCodec{}, fetcher, linkFeedID, nativeFeedID, sync.RWMutex{}, enhancedTelemChan} } -func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs relaymercuryv2.Observation, err error) { +func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs relaymercuryv2.Observation, pipelineExecutionErr error) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(ctx) @@ -80,15 +80,15 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() } + var trrs pipeline.TaskRunResults wg.Add(1) go func() { defer wg.Done() - var trrs pipeline.TaskRunResults var run *pipeline.Run - run, trrs, err = ds.executeRun(ctx) - if err != nil { + run, trrs, pipelineExecutionErr = ds.executeRun(ctx) + if pipelineExecutionErr != nil { cancel() - err = fmt.Errorf("Observe failed while executing run: %w", err) + pipelineExecutionErr = fmt.Errorf("Observe failed while executing run: %w", pipelineExecutionErr) return } select { @@ -98,12 +98,12 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam } var parsed parseOutput - parsed, err = ds.parse(trrs) - if err != nil { + parsed, pipelineExecutionErr = ds.parse(trrs) + if pipelineExecutionErr != nil { cancel() // This is not expected under normal circumstances - ds.lggr.Errorw("Observe failed while parsing run results", "err", err) - err = fmt.Errorf("Observe failed while parsing run results: %w", err) + ds.lggr.Errorw("Observe failed while parsing run results", "err", pipelineExecutionErr) + pipelineExecutionErr = fmt.Errorf("Observe failed while parsing run results: %w", pipelineExecutionErr) return } obs.BenchmarkPrice = parsed.benchmarkPrice @@ -149,11 +149,12 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam wg.Wait() cancel() + if pipelineExecutionErr != nil { + return + } + if isLink || isNative { - // run has now completed so it is safe to use err or benchmark price - if err != nil { - return - } + // run has now completed so it is safe to use benchmark price if isLink { // This IS the LINK feed, use our observed price obs.LinkPrice.Val, obs.LinkPrice.Err = obs.BenchmarkPrice.Val, obs.BenchmarkPrice.Err @@ -164,16 +165,17 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam } } - // todo: implement telemetry - https://smartcontract-it.atlassian.net/browse/MERC-1388 - // if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&ds.jb) { - // ocrcommon.EnqueueEnhancedTelem(ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{ - // TaskRunResults: trrs, - // Observation: obs, - // RepTimestamp: repts, - // }) - // } + ocrcommon.MaybeEnqueueEnhancedTelem(ds.jb, ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{ + V2Observation: &obs, + TaskRunResults: trrs, + RepTimestamp: repts, + FeedVersion: mercuryutils.REPORT_V2, + FetchMaxFinalizedTimestamp: fetchMaxFinalizedTimestamp, + IsLinkFeed: isLink, + IsNativeFeed: isNative, + }) - return obs, err + return obs, nil } func toBigInt(val interface{}) (*big.Int, error) { diff --git a/core/services/relay/evm/mercury/v3/data_source.go b/core/services/relay/evm/mercury/v3/data_source.go index 79f6c536efd..8fa825a31a3 100644 --- a/core/services/relay/evm/mercury/v3/data_source.go +++ b/core/services/relay/evm/mercury/v3/data_source.go @@ -59,7 +59,7 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec return &datasource{pr, jb, spec, feedID, lggr, rr, orm, reportcodec.ReportCodec{}, fetcher, linkFeedID, nativeFeedID, sync.RWMutex{}, enhancedTelemChan} } -func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs relaymercuryv3.Observation, err error) { +func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs relaymercuryv3.Observation, pipelineExecutionErr error) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(ctx) @@ -81,15 +81,15 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() } + var trrs pipeline.TaskRunResults wg.Add(1) go func() { defer wg.Done() - var trrs pipeline.TaskRunResults var run *pipeline.Run - run, trrs, err = ds.executeRun(ctx) - if err != nil { + run, trrs, pipelineExecutionErr = ds.executeRun(ctx) + if pipelineExecutionErr != nil { cancel() - err = fmt.Errorf("Observe failed while executing run: %w", err) + pipelineExecutionErr = fmt.Errorf("Observe failed while executing run: %w", pipelineExecutionErr) return } select { @@ -99,12 +99,12 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam } var parsed parseOutput - parsed, err = ds.parse(trrs) - if err != nil { + parsed, pipelineExecutionErr = ds.parse(trrs) + if pipelineExecutionErr != nil { cancel() // This is not expected under normal circumstances - ds.lggr.Errorw("Observe failed while parsing run results", "err", err) - err = fmt.Errorf("Observe failed while parsing run results: %w", err) + ds.lggr.Errorw("Observe failed while parsing run results", "err", pipelineExecutionErr) + pipelineExecutionErr = fmt.Errorf("Observe failed while parsing run results: %w", pipelineExecutionErr) return } obs.BenchmarkPrice = parsed.benchmarkPrice @@ -152,11 +152,12 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam wg.Wait() cancel() + if pipelineExecutionErr != nil { + return + } + if isLink || isNative { - // run has now completed so it is safe to use err or benchmark price - if err != nil { - return - } + // run has now completed so it is safe to use benchmark price if isLink { // This IS the LINK feed, use our observed price obs.LinkPrice.Val, obs.LinkPrice.Err = obs.BenchmarkPrice.Val, obs.BenchmarkPrice.Err @@ -167,17 +168,17 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam } } - // todo: implement telemetry https://smartcontract-it.atlassian.net/browse/MERC-1388 - // if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&ds.jb) { - // ocrcommon.EnqueueEnhancedTelem(ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{ - // TaskRunResults: trrs, - // Observation: obs, - // RepTimestamp: repts, - // }) - // } + ocrcommon.MaybeEnqueueEnhancedTelem(ds.jb, ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{ + V3Observation: &obs, + TaskRunResults: trrs, + RepTimestamp: repts, + FeedVersion: mercuryutils.REPORT_V3, + FetchMaxFinalizedTimestamp: fetchMaxFinalizedTimestamp, + IsLinkFeed: isLink, + IsNativeFeed: isNative, + }) - cancel() - return obs, err + return obs, nil } func toBigInt(val interface{}) (*big.Int, error) { diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go index 571a725b883..6a2213141a0 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go @@ -25,27 +25,42 @@ type EnhancedEAMercury struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - DataSource string `protobuf:"bytes,1,opt,name=data_source,json=dataSource,proto3" json:"data_source,omitempty"` - DpBenchmarkPrice float64 `protobuf:"fixed64,2,opt,name=dp_benchmark_price,json=dpBenchmarkPrice,proto3" json:"dp_benchmark_price,omitempty"` - DpBid float64 `protobuf:"fixed64,3,opt,name=dp_bid,json=dpBid,proto3" json:"dp_bid,omitempty"` - DpAsk float64 `protobuf:"fixed64,4,opt,name=dp_ask,json=dpAsk,proto3" json:"dp_ask,omitempty"` - CurrentBlockNumber int64 `protobuf:"varint,5,opt,name=current_block_number,json=currentBlockNumber,proto3" json:"current_block_number,omitempty"` - CurrentBlockHash string `protobuf:"bytes,6,opt,name=current_block_hash,json=currentBlockHash,proto3" json:"current_block_hash,omitempty"` - CurrentBlockTimestamp uint64 `protobuf:"varint,7,opt,name=current_block_timestamp,json=currentBlockTimestamp,proto3" json:"current_block_timestamp,omitempty"` - BridgeTaskRunStartedTimestamp int64 `protobuf:"varint,8,opt,name=bridge_task_run_started_timestamp,json=bridgeTaskRunStartedTimestamp,proto3" json:"bridge_task_run_started_timestamp,omitempty"` - BridgeTaskRunEndedTimestamp int64 `protobuf:"varint,9,opt,name=bridge_task_run_ended_timestamp,json=bridgeTaskRunEndedTimestamp,proto3" json:"bridge_task_run_ended_timestamp,omitempty"` - ProviderRequestedTimestamp int64 `protobuf:"varint,10,opt,name=provider_requested_timestamp,json=providerRequestedTimestamp,proto3" json:"provider_requested_timestamp,omitempty"` - ProviderReceivedTimestamp int64 `protobuf:"varint,11,opt,name=provider_received_timestamp,json=providerReceivedTimestamp,proto3" json:"provider_received_timestamp,omitempty"` - ProviderDataStreamEstablished int64 `protobuf:"varint,12,opt,name=provider_data_stream_established,json=providerDataStreamEstablished,proto3" json:"provider_data_stream_established,omitempty"` - ProviderIndicatedTime int64 `protobuf:"varint,13,opt,name=provider_indicated_time,json=providerIndicatedTime,proto3" json:"provider_indicated_time,omitempty"` - Feed string `protobuf:"bytes,14,opt,name=feed,proto3" json:"feed,omitempty"` - ObservationBenchmarkPrice int64 `protobuf:"varint,15,opt,name=observation_benchmark_price,json=observationBenchmarkPrice,proto3" json:"observation_benchmark_price,omitempty"` - ObservationBid int64 `protobuf:"varint,16,opt,name=observation_bid,json=observationBid,proto3" json:"observation_bid,omitempty"` - ObservationAsk int64 `protobuf:"varint,17,opt,name=observation_ask,json=observationAsk,proto3" json:"observation_ask,omitempty"` - ConfigDigest string `protobuf:"bytes,18,opt,name=config_digest,json=configDigest,proto3" json:"config_digest,omitempty"` - Round int64 `protobuf:"varint,19,opt,name=round,proto3" json:"round,omitempty"` - Epoch int64 `protobuf:"varint,20,opt,name=epoch,proto3" json:"epoch,omitempty"` - AssetSymbol string `protobuf:"bytes,21,opt,name=asset_symbol,json=assetSymbol,proto3" json:"asset_symbol,omitempty"` + Version uint32 `protobuf:"varint,32,opt,name=version,proto3" json:"version,omitempty"` + DataSource string `protobuf:"bytes,1,opt,name=data_source,json=dataSource,proto3" json:"data_source,omitempty"` + DpBenchmarkPrice float64 `protobuf:"fixed64,2,opt,name=dp_benchmark_price,json=dpBenchmarkPrice,proto3" json:"dp_benchmark_price,omitempty"` + DpBid float64 `protobuf:"fixed64,3,opt,name=dp_bid,json=dpBid,proto3" json:"dp_bid,omitempty"` + DpAsk float64 `protobuf:"fixed64,4,opt,name=dp_ask,json=dpAsk,proto3" json:"dp_ask,omitempty"` + // v1 fields (block range) + CurrentBlockNumber int64 `protobuf:"varint,5,opt,name=current_block_number,json=currentBlockNumber,proto3" json:"current_block_number,omitempty"` + CurrentBlockHash string `protobuf:"bytes,6,opt,name=current_block_hash,json=currentBlockHash,proto3" json:"current_block_hash,omitempty"` + CurrentBlockTimestamp uint64 `protobuf:"varint,7,opt,name=current_block_timestamp,json=currentBlockTimestamp,proto3" json:"current_block_timestamp,omitempty"` + // v2+v3 fields (timestamp range) + FetchMaxFinalizedTimestamp bool `protobuf:"varint,25,opt,name=fetch_max_finalized_timestamp,json=fetchMaxFinalizedTimestamp,proto3" json:"fetch_max_finalized_timestamp,omitempty"` + MaxFinalizedTimestamp int64 `protobuf:"varint,26,opt,name=max_finalized_timestamp,json=maxFinalizedTimestamp,proto3" json:"max_finalized_timestamp,omitempty"` + ObservationTimestamp uint32 `protobuf:"varint,27,opt,name=observation_timestamp,json=observationTimestamp,proto3" json:"observation_timestamp,omitempty"` + IsLinkFeed bool `protobuf:"varint,28,opt,name=is_link_feed,json=isLinkFeed,proto3" json:"is_link_feed,omitempty"` + LinkPrice int64 `protobuf:"varint,29,opt,name=link_price,json=linkPrice,proto3" json:"link_price,omitempty"` + IsNativeFeed bool `protobuf:"varint,30,opt,name=is_native_feed,json=isNativeFeed,proto3" json:"is_native_feed,omitempty"` + NativePrice int64 `protobuf:"varint,31,opt,name=native_price,json=nativePrice,proto3" json:"native_price,omitempty"` + BridgeTaskRunStartedTimestamp int64 `protobuf:"varint,8,opt,name=bridge_task_run_started_timestamp,json=bridgeTaskRunStartedTimestamp,proto3" json:"bridge_task_run_started_timestamp,omitempty"` + BridgeTaskRunEndedTimestamp int64 `protobuf:"varint,9,opt,name=bridge_task_run_ended_timestamp,json=bridgeTaskRunEndedTimestamp,proto3" json:"bridge_task_run_ended_timestamp,omitempty"` + ProviderRequestedTimestamp int64 `protobuf:"varint,10,opt,name=provider_requested_timestamp,json=providerRequestedTimestamp,proto3" json:"provider_requested_timestamp,omitempty"` + ProviderReceivedTimestamp int64 `protobuf:"varint,11,opt,name=provider_received_timestamp,json=providerReceivedTimestamp,proto3" json:"provider_received_timestamp,omitempty"` + ProviderDataStreamEstablished int64 `protobuf:"varint,12,opt,name=provider_data_stream_established,json=providerDataStreamEstablished,proto3" json:"provider_data_stream_established,omitempty"` + ProviderIndicatedTime int64 `protobuf:"varint,13,opt,name=provider_indicated_time,json=providerIndicatedTime,proto3" json:"provider_indicated_time,omitempty"` + Feed string `protobuf:"bytes,14,opt,name=feed,proto3" json:"feed,omitempty"` + // v1+v2+v3 + ObservationBenchmarkPrice int64 `protobuf:"varint,15,opt,name=observation_benchmark_price,json=observationBenchmarkPrice,proto3" json:"observation_benchmark_price,omitempty"` // This value overflows, will be reserved and removed in future versions + ObservationBenchmarkPriceString string `protobuf:"bytes,22,opt,name=observation_benchmark_price_string,json=observationBenchmarkPriceString,proto3" json:"observation_benchmark_price_string,omitempty"` + // v1+v3 + ObservationBid int64 `protobuf:"varint,16,opt,name=observation_bid,json=observationBid,proto3" json:"observation_bid,omitempty"` // This value overflows, will be reserved and removed in future versions + ObservationAsk int64 `protobuf:"varint,17,opt,name=observation_ask,json=observationAsk,proto3" json:"observation_ask,omitempty"` // This value overflows, will be reserved and removed in future versions + ObservationBidString string `protobuf:"bytes,23,opt,name=observation_bid_string,json=observationBidString,proto3" json:"observation_bid_string,omitempty"` + ObservationAskString string `protobuf:"bytes,24,opt,name=observation_ask_string,json=observationAskString,proto3" json:"observation_ask_string,omitempty"` + ConfigDigest string `protobuf:"bytes,18,opt,name=config_digest,json=configDigest,proto3" json:"config_digest,omitempty"` + Round int64 `protobuf:"varint,19,opt,name=round,proto3" json:"round,omitempty"` + Epoch int64 `protobuf:"varint,20,opt,name=epoch,proto3" json:"epoch,omitempty"` + AssetSymbol string `protobuf:"bytes,21,opt,name=asset_symbol,json=assetSymbol,proto3" json:"asset_symbol,omitempty"` } func (x *EnhancedEAMercury) Reset() { @@ -80,6 +95,13 @@ func (*EnhancedEAMercury) Descriptor() ([]byte, []int) { return file_telem_enhanced_ea_mercury_proto_rawDescGZIP(), []int{0} } +func (x *EnhancedEAMercury) GetVersion() uint32 { + if x != nil { + return x.Version + } + return 0 +} + func (x *EnhancedEAMercury) GetDataSource() string { if x != nil { return x.DataSource @@ -129,6 +151,55 @@ func (x *EnhancedEAMercury) GetCurrentBlockTimestamp() uint64 { return 0 } +func (x *EnhancedEAMercury) GetFetchMaxFinalizedTimestamp() bool { + if x != nil { + return x.FetchMaxFinalizedTimestamp + } + return false +} + +func (x *EnhancedEAMercury) GetMaxFinalizedTimestamp() int64 { + if x != nil { + return x.MaxFinalizedTimestamp + } + return 0 +} + +func (x *EnhancedEAMercury) GetObservationTimestamp() uint32 { + if x != nil { + return x.ObservationTimestamp + } + return 0 +} + +func (x *EnhancedEAMercury) GetIsLinkFeed() bool { + if x != nil { + return x.IsLinkFeed + } + return false +} + +func (x *EnhancedEAMercury) GetLinkPrice() int64 { + if x != nil { + return x.LinkPrice + } + return 0 +} + +func (x *EnhancedEAMercury) GetIsNativeFeed() bool { + if x != nil { + return x.IsNativeFeed + } + return false +} + +func (x *EnhancedEAMercury) GetNativePrice() int64 { + if x != nil { + return x.NativePrice + } + return 0 +} + func (x *EnhancedEAMercury) GetBridgeTaskRunStartedTimestamp() int64 { if x != nil { return x.BridgeTaskRunStartedTimestamp @@ -185,6 +256,13 @@ func (x *EnhancedEAMercury) GetObservationBenchmarkPrice() int64 { return 0 } +func (x *EnhancedEAMercury) GetObservationBenchmarkPriceString() string { + if x != nil { + return x.ObservationBenchmarkPriceString + } + return "" +} + func (x *EnhancedEAMercury) GetObservationBid() int64 { if x != nil { return x.ObservationBid @@ -199,6 +277,20 @@ func (x *EnhancedEAMercury) GetObservationAsk() int64 { return 0 } +func (x *EnhancedEAMercury) GetObservationBidString() string { + if x != nil { + return x.ObservationBidString + } + return "" +} + +func (x *EnhancedEAMercury) GetObservationAskString() string { + if x != nil { + return x.ObservationAskString + } + return "" +} + func (x *EnhancedEAMercury) GetConfigDigest() string { if x != nil { return x.ConfigDigest @@ -227,79 +319,114 @@ func (x *EnhancedEAMercury) GetAssetSymbol() string { return "" } -var File_telem_enhanced_ea_mercury_proto protoreflect.FileDescriptor - -var file_telem_enhanced_ea_mercury_proto_rawDesc = []byte{ - 0x0a, 0x1f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x65, 0x6e, 0x68, 0x61, 0x6e, 0x63, 0x65, 0x64, - 0x5f, 0x65, 0x61, 0x5f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xd5, 0x07, 0x0a, 0x11, 0x45, 0x6e, 0x68, - 0x61, 0x6e, 0x63, 0x65, 0x64, 0x45, 0x41, 0x4d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x12, 0x1f, - 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, - 0x2c, 0x0a, 0x12, 0x64, 0x70, 0x5f, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x5f, - 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x10, 0x64, 0x70, 0x42, - 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x15, 0x0a, - 0x06, 0x64, 0x70, 0x5f, 0x62, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x64, - 0x70, 0x42, 0x69, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x70, 0x5f, 0x61, 0x73, 0x6b, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x64, 0x70, 0x41, 0x73, 0x6b, 0x12, 0x30, 0x0a, 0x14, 0x63, - 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x75, 0x6d, - 0x62, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, - 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2c, 0x0a, - 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68, - 0x61, 0x73, 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x63, 0x75, 0x72, 0x72, 0x65, - 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x12, 0x36, 0x0a, 0x17, 0x63, - 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, 0x63, 0x75, - 0x72, 0x72, 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x12, 0x48, 0x0a, 0x21, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x74, 0x61, - 0x73, 0x6b, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x5f, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1d, - 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x75, 0x6e, 0x53, 0x74, 0x61, - 0x72, 0x74, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x44, 0x0a, - 0x1f, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x72, 0x75, 0x6e, - 0x5f, 0x65, 0x6e, 0x64, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1b, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x54, 0x61, - 0x73, 0x6b, 0x52, 0x75, 0x6e, 0x45, 0x6e, 0x64, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x12, 0x40, 0x0a, 0x1c, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x5f, - 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1a, 0x70, 0x72, 0x6f, 0x76, 0x69, - 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3e, 0x0a, 0x1b, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, - 0x72, 0x5f, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x19, 0x70, 0x72, 0x6f, 0x76, - 0x69, 0x64, 0x65, 0x72, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x47, 0x0a, 0x20, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, - 0x72, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x65, 0x73, - 0x74, 0x61, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x1d, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x44, 0x61, 0x74, 0x61, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x45, 0x73, 0x74, 0x61, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x64, 0x12, 0x36, - 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x64, 0x69, 0x63, - 0x61, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x15, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x49, 0x6e, 0x64, 0x69, 0x63, 0x61, 0x74, - 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x65, 0x65, 0x64, 0x18, 0x0e, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x65, 0x65, 0x64, 0x12, 0x3e, 0x0a, 0x1b, 0x6f, 0x62, - 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, - 0x61, 0x72, 0x6b, 0x5f, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x19, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x65, 0x6e, 0x63, - 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x6f, 0x62, - 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x62, 0x69, 0x64, 0x18, 0x10, 0x20, - 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x42, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x6b, 0x18, 0x11, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6f, 0x62, - 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x6b, 0x12, 0x23, 0x0a, 0x0d, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x12, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, 0x73, - 0x74, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x13, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x05, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, - 0x18, 0x14, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x12, 0x21, 0x0a, - 0x0c, 0x61, 0x73, 0x73, 0x65, 0x74, 0x5f, 0x73, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x18, 0x15, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x73, 0x73, 0x65, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, 0x6c, - 0x42, 0x4e, 0x5a, 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, - 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, - 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, - 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x68, - 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var File_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto protoreflect.FileDescriptor + +var file_core_services_synchronization_telem_telem_enhanced_ea_mercury_proto_rawDesc = []byte{ + 0x0a, 0x43, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x73, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, + 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x65, 0x6e, 0x68, 0x61, + 0x6e, 0x63, 0x65, 0x64, 0x5f, 0x65, 0x61, 0x5f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xe2, 0x0b, 0x0a, + 0x11, 0x45, 0x6e, 0x68, 0x61, 0x6e, 0x63, 0x65, 0x64, 0x45, 0x41, 0x4d, 0x65, 0x72, 0x63, 0x75, + 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x20, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, + 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x2c, 0x0a, + 0x12, 0x64, 0x70, 0x5f, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x5f, 0x70, 0x72, + 0x69, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x10, 0x64, 0x70, 0x42, 0x65, 0x6e, + 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x15, 0x0a, 0x06, 0x64, + 0x70, 0x5f, 0x62, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x64, 0x70, 0x42, + 0x69, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x70, 0x5f, 0x61, 0x73, 0x6b, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x01, 0x52, 0x05, 0x64, 0x70, 0x41, 0x73, 0x6b, 0x12, 0x30, 0x0a, 0x14, 0x63, 0x75, 0x72, + 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, + 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x63, + 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x68, 0x61, 0x73, + 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x12, 0x36, 0x0a, 0x17, 0x63, 0x75, 0x72, + 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, 0x63, 0x75, 0x72, 0x72, + 0x65, 0x6e, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x12, 0x41, 0x0a, 0x1d, 0x66, 0x65, 0x74, 0x63, 0x68, 0x5f, 0x6d, 0x61, 0x78, 0x5f, 0x66, + 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x18, 0x19, 0x20, 0x01, 0x28, 0x08, 0x52, 0x1a, 0x66, 0x65, 0x74, 0x63, 0x68, 0x4d, + 0x61, 0x78, 0x46, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x12, 0x36, 0x0a, 0x17, 0x6d, 0x61, 0x78, 0x5f, 0x66, 0x69, 0x6e, 0x61, + 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x1a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, 0x6d, 0x61, 0x78, 0x46, 0x69, 0x6e, 0x61, 0x6c, 0x69, + 0x7a, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x33, 0x0a, 0x15, + 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x1b, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x14, 0x6f, 0x62, 0x73, + 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x12, 0x20, 0x0a, 0x0c, 0x69, 0x73, 0x5f, 0x6c, 0x69, 0x6e, 0x6b, 0x5f, 0x66, 0x65, 0x65, + 0x64, 0x18, 0x1c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x69, 0x73, 0x4c, 0x69, 0x6e, 0x6b, 0x46, + 0x65, 0x65, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x69, 0x6e, 0x6b, 0x5f, 0x70, 0x72, 0x69, 0x63, + 0x65, 0x18, 0x1d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x6c, 0x69, 0x6e, 0x6b, 0x50, 0x72, 0x69, + 0x63, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, + 0x66, 0x65, 0x65, 0x64, 0x18, 0x1e, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x4e, 0x61, + 0x74, 0x69, 0x76, 0x65, 0x46, 0x65, 0x65, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x6e, 0x61, 0x74, 0x69, + 0x76, 0x65, 0x5f, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x1f, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, + 0x6e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x48, 0x0a, 0x21, 0x62, + 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1d, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x75, 0x6e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x44, 0x0a, 0x1f, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, + 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x72, 0x75, 0x6e, 0x5f, 0x65, 0x6e, 0x64, 0x65, 0x64, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1b, + 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x75, 0x6e, 0x45, 0x6e, 0x64, + 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x40, 0x0a, 0x1c, 0x70, + 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, + 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x1a, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3e, 0x0a, + 0x1b, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x5f, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, + 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x0b, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x19, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x52, 0x65, 0x63, 0x65, + 0x69, 0x76, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x47, 0x0a, + 0x20, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x65, 0x73, 0x74, 0x61, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, + 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x1d, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, + 0x72, 0x44, 0x61, 0x74, 0x61, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x73, 0x74, 0x61, 0x62, + 0x6c, 0x69, 0x73, 0x68, 0x65, 0x64, 0x12, 0x36, 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, + 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x64, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, + 0x72, 0x49, 0x6e, 0x64, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, + 0x0a, 0x04, 0x66, 0x65, 0x65, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x66, 0x65, + 0x65, 0x64, 0x12, 0x3e, 0x0a, 0x1b, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x5f, 0x70, 0x72, 0x69, 0x63, + 0x65, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x03, 0x52, 0x19, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x50, 0x72, 0x69, + 0x63, 0x65, 0x12, 0x4b, 0x0a, 0x22, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, 0x5f, 0x70, 0x72, 0x69, 0x63, + 0x65, 0x5f, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x16, 0x20, 0x01, 0x28, 0x09, 0x52, 0x1f, + 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x65, 0x6e, 0x63, 0x68, + 0x6d, 0x61, 0x72, 0x6b, 0x50, 0x72, 0x69, 0x63, 0x65, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, + 0x27, 0x0a, 0x0f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x62, + 0x69, 0x64, 0x18, 0x10, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x6f, 0x62, 0x73, 0x65, + 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x6b, 0x18, 0x11, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0e, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, + 0x6b, 0x12, 0x34, 0x0a, 0x16, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x62, 0x69, 0x64, 0x5f, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x17, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x14, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x69, + 0x64, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x34, 0x0a, 0x16, 0x6f, 0x62, 0x73, 0x65, 0x72, + 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x6b, 0x5f, 0x73, 0x74, 0x72, 0x69, 0x6e, + 0x67, 0x18, 0x18, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x6b, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x23, 0x0a, + 0x0d, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x12, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, + 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x13, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x05, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, + 0x68, 0x18, 0x14, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x12, 0x21, + 0x0a, 0x0c, 0x61, 0x73, 0x73, 0x65, 0x74, 0x5f, 0x73, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x18, 0x15, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x73, 0x73, 0x65, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, + 0x6c, 0x42, 0x4e, 0x5a, 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, + 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, + 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x79, 0x6e, 0x63, + 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x6c, 0x65, + 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto index 805d976d2d6..63bcb8999e6 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto @@ -5,23 +5,45 @@ option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/sync package telem; message EnhancedEAMercury { + uint32 version = 32; + string data_source=1; double dp_benchmark_price=2; double dp_bid=3; double dp_ask=4; + + // v1 fields (block range) int64 current_block_number=5; string current_block_hash=6; uint64 current_block_timestamp=7; + + // v2+v3 fields (timestamp range) + bool fetch_max_finalized_timestamp = 25; + int64 max_finalized_timestamp=26; + uint32 observation_timestamp=27; + bool is_link_feed=28; + int64 link_price=29; + bool is_native_feed=30; + int64 native_price=31; + int64 bridge_task_run_started_timestamp=8; int64 bridge_task_run_ended_timestamp=9; int64 provider_requested_timestamp=10; int64 provider_received_timestamp=11; int64 provider_data_stream_established=12; int64 provider_indicated_time=13; + string feed=14; - int64 observation_benchmark_price=15; - int64 observation_bid=16; - int64 observation_ask=17; + + // v1+v2+v3 + int64 observation_benchmark_price=15; // This value overflows, will be reserved and removed in future versions + string observation_benchmark_price_string = 22; + // v1+v3 + int64 observation_bid=16; // This value overflows, will be reserved and removed in future versions + int64 observation_ask=17; // This value overflows, will be reserved and removed in future versions + string observation_bid_string = 23; + string observation_ask_string = 24; + string config_digest = 18; int64 round=19; int64 epoch=20; diff --git a/core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql b/core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql new file mode 100644 index 00000000000..04cf5a2571d --- /dev/null +++ b/core/store/migrate/migrations/0205_add_feed_id_to_mercury_transmit_requests.sql @@ -0,0 +1,14 @@ +-- +goose Up +ALTER TABLE mercury_transmit_requests ADD COLUMN feed_id BYTEA CHECK (feed_id IS NULL OR octet_length(feed_id) = 32); +DROP INDEX idx_mercury_transmission_requests_epoch_round; +CREATE INDEX idx_mercury_transmission_requests_job_id_epoch_round ON mercury_transmit_requests (job_id, epoch DESC, round DESC); +CREATE INDEX idx_mercury_transmit_requests_job_id ON mercury_transmit_requests (job_id); +CREATE INDEX idx_mercury_transmit_requests_feed_id ON mercury_transmit_requests (feed_id); +CREATE INDEX idx_mercury_feed_latest_reports_job_id ON feed_latest_reports (job_id); + +-- +goose Down +ALTER TABLE mercury_transmit_requests DROP COLUMN feed_id; +DROP INDEX idx_mercury_transmit_requests_job_id; +DROP INDEX idx_mercury_feed_latest_reports_job_id; +CREATE INDEX idx_mercury_transmission_requests_epoch_round ON mercury_transmit_requests (epoch DESC, round DESC); +DROP INDEX idx_mercury_transmission_requests_job_id_epoch_round; diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 64bace19935..e1081f3dbab 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [dev] +### Added + +- Added a new, optional WebServer authentication option that supports LDAP as a user identity provider. This enables user login access and user roles to be managed and provisioned via a centralized remote server that supports the LDAP protocol, which can be helpful when running multiple nodes. See the documentation for more information and config setup instructions. There is a new `[WebServer].AuthenticationMethod` config option, when set to `ldap` requires the new `[WebServer.LDAP]` config section to be defined, see the reference `docs/core.toml`. +- New prom metrics for mercury: + `mercury_transmit_queue_delete_error_count` + `mercury_transmit_queue_insert_error_count` + `mercury_transmit_queue_push_error_count` + Nops should consider alerting on these. + + +### Changed + +- `L2Suggested` mode is now called `SuggestedPrice` + +### Removed + +- Removed `Optimism2` as a supported gas estimator mode + +### Added + +- Mercury v0.2 has improved consensus around current block that uses the most recent 5 blocks instead of only the latest one +- Two new prom metrics for mercury, nops should consider adding alerting on these: + - `mercury_insufficient_blocks_count` + - `mercury_zero_blocks_count` + ... ## 2.6.0 - UNRELEASED