diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index 3a0ba049036..54a5002093d 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -354,7 +354,7 @@ func ShouldCollectEnhancedTelemetryMercury(job *job.Job) bool { // bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.TaskRunResult, allTasks *pipeline.TaskRunResults) (float64, float64, float64) { var benchmarkPrice, askPrice, bidPrice float64 - var ok bool + var err error //We rely on task results to be sorted in the correct order benchmarkPriceTask := allTasks.GetNextTaskOf(startTask) if benchmarkPriceTask == nil { @@ -365,9 +365,9 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta if benchmarkPriceTask.Result.Error != nil { e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, job %d, id %s: %s", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error) } else { - benchmarkPrice, ok = benchmarkPriceTask.Result.Value.(float64) - if !ok { - e.lggr.Warnf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s (expected float64, got type: %T)", e.job.ID, benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Value) + benchmarkPrice, err = getResultFloat64(benchmarkPriceTask) + if err != nil { + e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, job %d, id %s", e.job.ID, benchmarkPriceTask.Task.DotID()), "err", err) } } } @@ -375,15 +375,15 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta bidTask := allTasks.GetNextTaskOf(*benchmarkPriceTask) if bidTask == nil { e.lggr.Warnf("cannot parse enhanced EA telemetry bid price, task is nil, job %d, id %s", e.job.ID) - return 0, 0, 0 + return benchmarkPrice, 0, 0 } if bidTask.Task.Type() == pipeline.TaskTypeJSONParse { if bidTask.Result.Error != nil { e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, job %d, id %s: %s", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error) } else { - bidPrice, ok = bidTask.Result.Value.(float64) - if !ok { - e.lggr.Warnf("cannot parse enhanced EA telemetry bid price, job %d, id %s (expected float64, got type: %T)", e.job.ID, bidTask.Task.DotID(), bidTask.Result.Value) + bidPrice, err = getResultFloat64(bidTask) + if err != nil { + e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, job %d, id %s", e.job.ID, bidTask.Task.DotID()), "err", err) } } } @@ -391,15 +391,15 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResults(startTask pipeline.Ta askTask := allTasks.GetNextTaskOf(*bidTask) if askTask == nil { e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, job %d, id %s", e.job.ID) - return 0, 0, 0 + return benchmarkPrice, bidPrice, 0 } if askTask.Task.Type() == pipeline.TaskTypeJSONParse { if bidTask.Result.Error != nil { e.lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, job %d, id %s: %s", e.job.ID, askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error) } else { - askPrice, ok = askTask.Result.Value.(float64) - if !ok { - e.lggr.Warnf("cannot parse enhanced EA telemetry ask price, job %d, id %s (expected float64, got type: %T)", e.job.ID, askTask.Task.DotID(), askTask.Result.Value) + askPrice, err = getResultFloat64(askTask) + if err != nil { + e.lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, job %d, id %s", e.job.ID, askTask.Task.DotID()), "err", err) } } } @@ -431,3 +431,13 @@ func EnqueueEnhancedTelem[T EnhancedTelemetryData | EnhancedTelemetryMercuryData default: } } + +// getResultFloat64 will check the result type and force it to float64 or returns an error if the conversion cannot be made +func getResultFloat64(task *pipeline.TaskRunResult) (float64, error) { + result, err := utils.ToDecimal(task.Result.Value) + if err != nil { + return 0, err + } + resultFloat64, _ := result.Float64() + return resultFloat64, nil +} diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index e7c41c46761..495dc6fe788 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -416,7 +416,7 @@ var trrsMercury = pipeline.TaskRunResults{ BaseTask: pipeline.NewBaseTask(3, "ds3_ask", nil, nil, 3), }, Result: pipeline.Result{ - Value: float64(123456789.1), + Value: int64(321123), }, }, } @@ -461,7 +461,7 @@ func TestGetPricesFromResults(t *testing.T) { benchmarkPrice, bid, ask := e.getPricesFromResults(trrsMercury[0], &trrsMercury) require.Equal(t, 123456.123456, benchmarkPrice) require.Equal(t, 1234567.1234567, bid) - require.Equal(t, 123456789.1, ask) + require.Equal(t, float64(321123), ask) benchmarkPrice, bid, ask = e.getPricesFromResults(trrsMercury[0], &pipeline.TaskRunResults{}) require.Equal(t, float64(0), benchmarkPrice) @@ -601,7 +601,7 @@ func TestCollectMercuryEnhancedTelemetry(t *testing.T) { DataSource: "data-source-name", DpBenchmarkPrice: 123456.123456, DpBid: 1234567.1234567, - DpAsk: 123456789.1, + DpAsk: 321123, CurrentBlockNumber: 123456789, CurrentBlockHash: common.HexToHash("0x123321").String(), CurrentBlockTimestamp: 987654321, diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 299e248428c..64bace19935 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -41,6 +41,7 @@ All nodes will have to remove the following configuration field: `ExplorerURL` - Fixed a bug where `evmChainId` is requested instead of `id` or `evm-chain-id` in CLI error verbatim - Fixed a bug that would cause the node to shut down while performing backup - Fixed health checker to include more services in the prometheus `health` metric and HTTP `/health` endpoint +- Fixed a bug where prices would not be parsed correctly in telemetry data