From a0d94f5ae90553bcc6da8dd8cfe8d1c3baa55ea1 Mon Sep 17 00:00:00 2001 From: Julian Ventura Date: Tue, 10 Dec 2024 16:00:10 -0300 Subject: [PATCH] Add gas price and effective gas price traces --- aggregator/pkg/aggregator.go | 13 +++++---- aggregator/pkg/telemetry.go | 28 ++++++++++--------- core/chainio/avs_writer.go | 6 ++-- telemetry_api/lib/telemetry_api/traces.ex | 16 +++++------ .../controllers/trace_controller.ex | 14 +++++----- telemetry_api/lib/telemetry_api_web/router.ex | 2 +- 6 files changed, 42 insertions(+), 37 deletions(-) diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 467b3b52b..4d158486f 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -285,10 +285,12 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA if err == nil { // In some cases, we may fail to retrieve the receipt for the transaction. txHash := "Unknown" + effectiveGasPrice := "Unknown" if receipt != nil { txHash = receipt.TxHash.String() + effectiveGasPrice = receipt.EffectiveGasPrice.String() } - agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash) + agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash, effectiveGasPrice) agg.logger.Info("Aggregator successfully responded to task", "taskIndex", blsAggServiceResp.TaskIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:])) @@ -316,9 +318,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:])) // This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse - onGasPriceBumped := func(bumpedGasPrice *big.Int) { - agg.metrics.IncBumpedGasPriceForAggregatedResponse() - agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String()) + onSetGasPrice := func(gasPrice *big.Int) { + agg.telemetry.TaskSetGasPrice(batchMerkleRoot, gasPrice.String()) } receipt, err := agg.avsWriter.SendAggregatedResponse( batchIdentifierHash, @@ -329,12 +330,12 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage, agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit, agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump, - onGasPriceBumped, + agg.metrics, + onSetGasPrice, ) if err != nil { agg.walletMutex.Unlock() agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err) - agg.telemetry.LogTaskError(batchMerkleRoot, err) return nil, err } diff --git a/aggregator/pkg/telemetry.go b/aggregator/pkg/telemetry.go index 94737d2ba..01dd132a3 100644 --- a/aggregator/pkg/telemetry.go +++ b/aggregator/pkg/telemetry.go @@ -30,14 +30,15 @@ type TaskErrorMessage struct { TaskError string `json:"error"` } -type TaskGasPriceBumpMessage struct { - MerkleRoot string `json:"merkle_root"` - BumpedGasPrice string `json:"bumped_gas_price"` +type TaskSetGasPriceMessage struct { + MerkleRoot string `json:"merkle_root"` + GasPrice string `json:"gas_price"` } type TaskSentToEthereumMessage struct { - MerkleRoot string `json:"merkle_root"` - TxHash string `json:"tx_hash"` + MerkleRoot string `json:"merkle_root"` + TxHash string `json:"tx_hash"` + EffectiveGasPrice string `json:"effective_gas_price"` } type Telemetry struct { @@ -101,20 +102,21 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) { } } -func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) { - body := TaskGasPriceBumpMessage{ - MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])), - BumpedGasPrice: bumpedGasPrice, +func (t *Telemetry) TaskSetGasPrice(batchMerkleRoot [32]byte, gasPrice string) { + body := TaskSetGasPriceMessage{ + MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])), + GasPrice: gasPrice, } - if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil { + if err := t.sendTelemetryMessage("/api/aggregatorTaskSetGasPrice", body); err != nil { t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err) } } -func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) { +func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string, effectiveGasPrice string) { body := TaskSentToEthereumMessage{ - MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])), - TxHash: txHash, + MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])), + TxHash: txHash, + EffectiveGasPrice: effectiveGasPrice, } if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil { t.logger.Warn("[Telemetry] Error in TaskSentToEthereum", "error", err) diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index cd379ee23..e98051413 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -89,7 +89,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E // - If no receipt is found, but the batch state indicates the response has already been processed, it exits // without an error (returning `nil, nil`). // - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state). -func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) { +func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, metrics *metrics.Metrics, onSetGasPrice func(*big.Int)) (*types.Receipt, error) { txOpts := *w.Signer.GetTxOpts() txOpts.NoSend = true // simulate the transaction simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams()) @@ -141,6 +141,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe txOpts.GasPrice = minimumGasPriceBump } + onSetGasPrice(txOpts.GasPrice) + if i > 0 { w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString) for _, tx := range sentTxs { @@ -161,7 +163,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe } w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString) - onGasPriceBumped(txOpts.GasPrice) + metrics.IncBumpedGasPriceForAggregatedResponse() } // We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index e611f386a..d024b88cf 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -300,18 +300,18 @@ defmodule TelemetryApi.Traces do end @doc """ - Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace. + Registers a set gas price when the aggregator tries to respond to a task in the task trace. ## Examples iex> merkle_root - iex> bumped_gas_price - iex> aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) + iex> gas_price + iex> aggregator_task_set_gas_price(merkle_root, gas_price) :ok """ - def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do + def aggregator_task_set_gas_price(merkle_root, gas_price) do with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}]) + Tracer.add_event("Gas price set", [{"gas_price", gas_price}]) :ok end end @@ -323,12 +323,12 @@ defmodule TelemetryApi.Traces do iex> merkle_root iex> tx_hash - iex> aggregator_task_sent(merkle_root, tx_hash) + iex> aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) :ok """ - def aggregator_task_sent(merkle_root, tx_hash) do + def aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}]) + Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}, {"effective_gas_price", effective_gas_price}]) :ok end end diff --git a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex index b9e334652..5abf82f36 100644 --- a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex +++ b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex @@ -123,14 +123,14 @@ defmodule TelemetryApiWeb.TraceController do end @doc """ - Registers a gas price bump in the trace of the given merkle_root - Method: POST aggregatorTaskGasPriceBump + Registers a gas price in the trace of the given merkle_root + Method: POST aggregatorTaskSetGasPrice """ - def aggregator_task_gas_price_bumped(conn, %{ + def aggregator_task_set_gas_price(conn, %{ "merkle_root" => merkle_root, - "bumped_gas_price" => bumped_gas_price + "gas_price" => gas_price }) do - with :ok <- Traces.aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do + with :ok <- Traces.aggregator_task_set_gas_price(merkle_root, gas_price) do conn |> put_status(:ok) |> render(:show_merkle, merkle_root: merkle_root) @@ -141,8 +141,8 @@ defmodule TelemetryApiWeb.TraceController do Register a task sent, from the aggregator, to Ethereum in the trace of the given merkle_root Method: POST aggregatorTaskSent """ - def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash}) do - with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash) do + def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash, "effective_gas_price" => effective_gas_price}) do + with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do conn |> put_status(:ok) |> render(:show_merkle, merkle_root: merkle_root) diff --git a/telemetry_api/lib/telemetry_api_web/router.ex b/telemetry_api/lib/telemetry_api_web/router.ex index 7115cced6..6d26b5241 100644 --- a/telemetry_api/lib/telemetry_api_web/router.ex +++ b/telemetry_api/lib/telemetry_api_web/router.ex @@ -15,7 +15,7 @@ defmodule TelemetryApiWeb.Router do post "/operatorResponse", TraceController, :register_operator_response post "/quorumReached", TraceController, :quorum_reached post "/taskError", TraceController, :task_error - post "/aggregatorTaskGasPriceBump", TraceController, :aggregator_task_gas_price_bumped + post "/aggregatorTaskSetGasPrice", TraceController, :aggregator_task_set_gas_price post "/aggregatorTaskSent", TraceController, :aggregator_task_sent post "/finishTaskTrace", TraceController, :finish_task_trace