Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve jaeger traces on the aggregator #1603

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,12 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
if err == nil {
// In some cases, we may fail to retrieve the receipt for the transaction.
txHash := "Unknown"
effectiveGasPrice := "Unknown"
JuArce marked this conversation as resolved.
Show resolved Hide resolved
if receipt != nil {
txHash = receipt.TxHash.String()
effectiveGasPrice = receipt.EffectiveGasPrice.String()
}
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash, effectiveGasPrice)
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
Expand Down Expand Up @@ -316,9 +318,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
onSetGasPrice := func(gasPrice *big.Int) {
agg.telemetry.TaskSetGasPrice(batchMerkleRoot, gasPrice.String())
}
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
Expand All @@ -329,12 +330,12 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit,
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
onGasPriceBumped,
agg.metrics,
onSetGasPrice,
)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
agg.telemetry.LogTaskError(batchMerkleRoot, err)
return nil, err
}

Expand Down
28 changes: 15 additions & 13 deletions aggregator/pkg/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ type TaskErrorMessage struct {
TaskError string `json:"error"`
}

type TaskGasPriceBumpMessage struct {
MerkleRoot string `json:"merkle_root"`
BumpedGasPrice string `json:"bumped_gas_price"`
type TaskSetGasPriceMessage struct {
MerkleRoot string `json:"merkle_root"`
GasPrice string `json:"gas_price"`
}

type TaskSentToEthereumMessage struct {
MerkleRoot string `json:"merkle_root"`
TxHash string `json:"tx_hash"`
MerkleRoot string `json:"merkle_root"`
TxHash string `json:"tx_hash"`
EffectiveGasPrice string `json:"effective_gas_price"`
}

type Telemetry struct {
Expand Down Expand Up @@ -101,20 +102,21 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
}
}

func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
body := TaskGasPriceBumpMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
BumpedGasPrice: bumpedGasPrice,
func (t *Telemetry) TaskSetGasPrice(batchMerkleRoot [32]byte, gasPrice string) {
body := TaskSetGasPriceMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
GasPrice: gasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
if err := t.sendTelemetryMessage("/api/aggregatorTaskSetGasPrice", body); err != nil {
t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err)
}
}

func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string, effectiveGasPrice string) {
body := TaskSentToEthereumMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TxHash: txHash,
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TxHash: txHash,
EffectiveGasPrice: effectiveGasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil {
t.logger.Warn("[Telemetry] Error in TaskSentToEthereum", "error", err)
Expand Down
6 changes: 4 additions & 2 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
// - If no receipt is found, but the batch state indicates the response has already been processed, it exits
// without an error (returning `nil, nil`).
// - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state).
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, metrics *metrics.Metrics, onSetGasPrice func(*big.Int)) (*types.Receipt, error) {
txOpts := *w.Signer.GetTxOpts()
txOpts.NoSend = true // simulate the transaction
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
Expand Down Expand Up @@ -141,6 +141,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
txOpts.GasPrice = minimumGasPriceBump
}

onSetGasPrice(txOpts.GasPrice)

if i > 0 {
w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString)
for _, tx := range sentTxs {
Expand All @@ -161,7 +163,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
}
w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString)

onGasPriceBumped(txOpts.GasPrice)
metrics.IncBumpedGasPriceForAggregatedResponse()
}

// We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
Expand Down
16 changes: 8 additions & 8 deletions telemetry_api/lib/telemetry_api/traces.ex
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,18 @@ defmodule TelemetryApi.Traces do
end

@doc """
Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace.
Registers a set gas price when the aggregator tries to respond to a task in the task trace.

## Examples

iex> merkle_root
iex> bumped_gas_price
iex> aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price)
iex> gas_price
iex> aggregator_task_set_gas_price(merkle_root, gas_price)
:ok
"""
def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
def aggregator_task_set_gas_price(merkle_root, gas_price) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}])
Tracer.add_event("Gas price set", [{"gas_price", gas_price}])
:ok
end
end
Expand All @@ -323,12 +323,12 @@ defmodule TelemetryApi.Traces do

iex> merkle_root
iex> tx_hash
iex> aggregator_task_sent(merkle_root, tx_hash)
iex> aggregator_task_sent(merkle_root, tx_hash, effective_gas_price)
:ok
"""
def aggregator_task_sent(merkle_root, tx_hash) do
def aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}])
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}, {"effective_gas_price", effective_gas_price}])
:ok
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ defmodule TelemetryApiWeb.TraceController do
end

@doc """
Registers a gas price bump in the trace of the given merkle_root
Method: POST aggregatorTaskGasPriceBump
Registers a gas price in the trace of the given merkle_root
Method: POST aggregatorTaskSetGasPrice
"""
def aggregator_task_gas_price_bumped(conn, %{
def aggregator_task_set_gas_price(conn, %{
"merkle_root" => merkle_root,
"bumped_gas_price" => bumped_gas_price
"gas_price" => gas_price
}) do
with :ok <- Traces.aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
with :ok <- Traces.aggregator_task_set_gas_price(merkle_root, gas_price) do
conn
|> put_status(:ok)
|> render(:show_merkle, merkle_root: merkle_root)
Expand All @@ -141,8 +141,8 @@ defmodule TelemetryApiWeb.TraceController do
Register a task sent, from the aggregator, to Ethereum in the trace of the given merkle_root
Method: POST aggregatorTaskSent
"""
def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash}) do
with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash) do
def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash, "effective_gas_price" => effective_gas_price}) do
with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do
conn
|> put_status(:ok)
|> render(:show_merkle, merkle_root: merkle_root)
Expand Down
2 changes: 1 addition & 1 deletion telemetry_api/lib/telemetry_api_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule TelemetryApiWeb.Router do
post "/operatorResponse", TraceController, :register_operator_response
post "/quorumReached", TraceController, :quorum_reached
post "/taskError", TraceController, :task_error
post "/aggregatorTaskGasPriceBump", TraceController, :aggregator_task_gas_price_bumped
post "/aggregatorTaskSetGasPrice", TraceController, :aggregator_task_set_gas_price
post "/aggregatorTaskSent", TraceController, :aggregator_task_sent
post "/finishTaskTrace", TraceController, :finish_task_trace

Expand Down
Loading