Skip to content

Commit

Permalink
feat: add aggregator quorum reached and task responded latency gauges (
Browse files Browse the repository at this point in the history
…#1565)

Co-authored-by: Julian Ventura <[email protected]>
  • Loading branch information
JulianVentura and Julian Ventura authored Dec 10, 2024
1 parent 75c34a8 commit a4e8e31
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 6 deletions.
17 changes: 17 additions & 0 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores the start time for each batch of the aggregator by task index
batchStartTimeByIdx map[uint32]time.Time

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand All @@ -78,6 +81,7 @@ type Aggregator struct {
// - batchCreatedBlockByIdx
// - batchDataByIdentifierHash
// - nextBatchIndex
// - batchStartTimeByIdx
taskMutex *sync.Mutex

// Mutex to protect ethereum wallet
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
batchStartTimeByIdx := make(map[uint32]time.Time)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
batchStartTimeByIdx: batchStartTimeByIdx,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")

Expand Down Expand Up @@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)

// Only observe quorum reached if successful
agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))

agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
}

startTime := time.Now()
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
batchMerkleRoot,
Expand All @@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
return nil, err
}

// We only send the latency metric if the response is successul
agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.batchStartTimeByIdx[batchIndex] = time.Now()
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
Expand Down Expand Up @@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
delete(agg.batchStartTimeByIdx, i)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
Expand Down
230 changes: 224 additions & 6 deletions grafana/provisioning/dashboards/aligned/aggregator_batcher.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 4,
"id": 2,
"links": [],
"liveNow": false,
"panels": [
Expand Down Expand Up @@ -153,7 +153,6 @@
},
{
"datasource": {
"default": true,
"type": "prometheus",
"uid": "prometheus"
},
Expand Down Expand Up @@ -2451,7 +2450,32 @@
]
}
},
"overrides": []
"overrides": [
{
"__systemRef": "hideSeriesFrom",
"matcher": {
"id": "byNames",
"options": {
"mode": "exclude",
"names": [
"{bot=\"aggregator\", instance=\"host.docker.internal:9091\", job=\"aligned-aggregator\"}"
],
"prefix": "All except:",
"readOnly": true
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": false,
"tooltip": false,
"viz": true
}
}
]
}
]
},
"gridPos": {
"h": 7,
Expand Down Expand Up @@ -2625,9 +2649,203 @@
}
],
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 61
},
"id": 43,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "right",
"showLegend": false
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"editorMode": "code",
"expr": "aligned_aggregator_respond_to_task_latency{bot=\"aggregator\"}",
"hide": false,
"instant": false,
"legendFormat": "Latest latency",
"range": true,
"refId": "Latency"
}
],
"title": "Latest respond to task latency",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 69
},
"id": 44,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "right",
"showLegend": false
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"editorMode": "code",
"expr": "aligned_aggregator_task_quorum_reached_latency{bot=\"aggregator\"}",
"hide": false,
"instant": false,
"legendFormat": "Latest latency",
"range": true,
"refId": "A"
}
],
"title": "Latest quorum reached latency",
"type": "timeseries"
}
],
"refresh": "5s",
"refresh": "",
"schemaVersion": 38,
"style": "dark",
"tags": [],
Expand All @@ -2642,6 +2860,6 @@
"timezone": "browser",
"title": "System Data",
"uid": "aggregator",
"version": 9,
"version": 19,
"weekStart": ""
}
}
20 changes: 20 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Metrics struct {
aggregatorGasCostPaidForBatcherTotal prometheus.Gauge
aggregatorNumTimesPaidForBatcher prometheus.Counter
numBumpedGasPriceForAggregatedResponse prometheus.Counter
aggregatorRespondToTaskLatency prometheus.Gauge
aggregatorTaskQuorumReachedLatency prometheus.Gauge
}

const alignedNamespace = "aligned"
Expand Down Expand Up @@ -59,6 +61,16 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging.
Name: "respond_to_task_gas_price_bumped_count",
Help: "Number of times gas price was bumped while sending aggregated response",
}),
aggregatorRespondToTaskLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: alignedNamespace,
Name: "aggregator_respond_to_task_latency",
Help: "Latency of last call to respondToTask on Aligned Service Manager",
}),
aggregatorTaskQuorumReachedLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: alignedNamespace,
Name: "aggregator_task_quorum_reached_latency",
Help: "Time it takes for a task to reach quorum",
}),
}
}

Expand Down Expand Up @@ -116,3 +128,11 @@ func (m *Metrics) AddAggregatorGasPaidForBatcher(value float64) {
func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() {
m.numBumpedGasPriceForAggregatedResponse.Inc()
}

func (m *Metrics) ObserveLatencyForRespondToTask(elapsed time.Duration) {
m.aggregatorRespondToTaskLatency.Set(elapsed.Seconds())
}

func (m *Metrics) ObserveTaskQuorumReached(elapsed time.Duration) {
m.aggregatorTaskQuorumReachedLatency.Set(elapsed.Seconds())
}

0 comments on commit a4e8e31

Please sign in to comment.