Skip to content

Commit

Permalink
feat: Improve jaeger traces on the aggregator (#1603)
Browse files Browse the repository at this point in the history
Co-authored-by: Julian Ventura <[email protected]>
  • Loading branch information
2 people authored and PatStiles committed Jan 10, 2025
1 parent 7f84d87 commit d0cd0eb
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 37 deletions.
13 changes: 7 additions & 6 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,12 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
if err == nil {
// In some cases, we may fail to retrieve the receipt for the transaction.
txHash := "Unknown"
effectiveGasPrice := "Unknown"
if receipt != nil {
txHash = receipt.TxHash.String()
effectiveGasPrice = receipt.EffectiveGasPrice.String()
}
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash, effectiveGasPrice)
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
Expand Down Expand Up @@ -326,9 +328,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
onSetGasPrice := func(gasPrice *big.Int) {
agg.telemetry.TaskSetGasPrice(batchMerkleRoot, gasPrice.String())
}

startTime := time.Now()
Expand All @@ -341,12 +342,12 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit,
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
onGasPriceBumped,
agg.metrics,
onSetGasPrice,
)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
agg.telemetry.LogTaskError(batchMerkleRoot, err)
return nil, err
}

Expand Down
28 changes: 15 additions & 13 deletions aggregator/pkg/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ type TaskErrorMessage struct {
TaskError string `json:"error"`
}

type TaskGasPriceBumpMessage struct {
MerkleRoot string `json:"merkle_root"`
BumpedGasPrice string `json:"bumped_gas_price"`
type TaskSetGasPriceMessage struct {
MerkleRoot string `json:"merkle_root"`
GasPrice string `json:"gas_price"`
}

type TaskSentToEthereumMessage struct {
MerkleRoot string `json:"merkle_root"`
TxHash string `json:"tx_hash"`
MerkleRoot string `json:"merkle_root"`
TxHash string `json:"tx_hash"`
EffectiveGasPrice string `json:"effective_gas_price"`
}

type Telemetry struct {
Expand Down Expand Up @@ -101,20 +102,21 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
}
}

func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
body := TaskGasPriceBumpMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
BumpedGasPrice: bumpedGasPrice,
func (t *Telemetry) TaskSetGasPrice(batchMerkleRoot [32]byte, gasPrice string) {
body := TaskSetGasPriceMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
GasPrice: gasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
if err := t.sendTelemetryMessage("/api/aggregatorTaskSetGasPrice", body); err != nil {
t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err)
}
}

func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string, effectiveGasPrice string) {
body := TaskSentToEthereumMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TxHash: txHash,
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TxHash: txHash,
EffectiveGasPrice: effectiveGasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil {
t.logger.Warn("[Telemetry] Error in TaskSentToEthereum", "error", err)
Expand Down
6 changes: 4 additions & 2 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
// - If no receipt is found, but the batch state indicates the response has already been processed, it exits
// without an error (returning `nil, nil`).
// - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state).
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, metrics *metrics.Metrics, onSetGasPrice func(*big.Int)) (*types.Receipt, error) {
txOpts := *w.Signer.GetTxOpts()
txOpts.NoSend = true // simulate the transaction
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
Expand Down Expand Up @@ -141,6 +141,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
txOpts.GasPrice = minimumGasPriceBump
}

onSetGasPrice(txOpts.GasPrice)

if i > 0 {
w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString)
for _, tx := range sentTxs {
Expand All @@ -161,7 +163,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
}
w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString)

onGasPriceBumped(txOpts.GasPrice)
metrics.IncBumpedGasPriceForAggregatedResponse()
}

// We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
Expand Down
16 changes: 8 additions & 8 deletions telemetry_api/lib/telemetry_api/traces.ex
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,18 @@ defmodule TelemetryApi.Traces do
end

@doc """
Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace.
Registers a set gas price when the aggregator tries to respond to a task in the task trace.
## Examples
iex> merkle_root
iex> bumped_gas_price
iex> aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price)
iex> gas_price
iex> aggregator_task_set_gas_price(merkle_root, gas_price)
:ok
"""
def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
def aggregator_task_set_gas_price(merkle_root, gas_price) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}])
Tracer.add_event("Gas price set", [{"gas_price", gas_price}])
:ok
end
end
Expand All @@ -323,12 +323,12 @@ defmodule TelemetryApi.Traces do
iex> merkle_root
iex> tx_hash
iex> aggregator_task_sent(merkle_root, tx_hash)
iex> aggregator_task_sent(merkle_root, tx_hash, effective_gas_price)
:ok
"""
def aggregator_task_sent(merkle_root, tx_hash) do
def aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}])
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}, {"effective_gas_price", effective_gas_price}])
:ok
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ defmodule TelemetryApiWeb.TraceController do
end

@doc """
Registers a gas price bump in the trace of the given merkle_root
Method: POST aggregatorTaskGasPriceBump
Registers a gas price in the trace of the given merkle_root
Method: POST aggregatorTaskSetGasPrice
"""
def aggregator_task_gas_price_bumped(conn, %{
def aggregator_task_set_gas_price(conn, %{
"merkle_root" => merkle_root,
"bumped_gas_price" => bumped_gas_price
"gas_price" => gas_price
}) do
with :ok <- Traces.aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
with :ok <- Traces.aggregator_task_set_gas_price(merkle_root, gas_price) do
conn
|> put_status(:ok)
|> render(:show_merkle, merkle_root: merkle_root)
Expand All @@ -141,8 +141,8 @@ defmodule TelemetryApiWeb.TraceController do
Register a task sent, from the aggregator, to Ethereum in the trace of the given merkle_root
Method: POST aggregatorTaskSent
"""
def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash}) do
with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash) do
def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash, "effective_gas_price" => effective_gas_price}) do
with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do
conn
|> put_status(:ok)
|> render(:show_merkle, merkle_root: merkle_root)
Expand Down
2 changes: 1 addition & 1 deletion telemetry_api/lib/telemetry_api_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule TelemetryApiWeb.Router do
post "/operatorResponse", TraceController, :register_operator_response
post "/quorumReached", TraceController, :quorum_reached
post "/taskError", TraceController, :task_error
post "/aggregatorTaskGasPriceBump", TraceController, :aggregator_task_gas_price_bumped
post "/aggregatorTaskSetGasPrice", TraceController, :aggregator_task_set_gas_price
post "/aggregatorTaskSent", TraceController, :aggregator_task_sent
post "/finishTaskTrace", TraceController, :finish_task_trace

Expand Down

0 comments on commit d0cd0eb

Please sign in to comment.