From a4e8e312f414165d18d7bd3d0eed2a0e340e5ec5 Mon Sep 17 00:00:00 2001 From: Julian Ventura <43799596+JulianVentura@users.noreply.github.com> Date: Tue, 10 Dec 2024 12:38:47 -0300 Subject: [PATCH] feat: add aggregator quorum reached and task responded latency gauges (#1565) Co-authored-by: Julian Ventura --- aggregator/pkg/aggregator.go | 17 ++ .../aligned/aggregator_batcher.json | 230 +++++++++++++++++- metrics/metrics.go | 20 ++ 3 files changed, 261 insertions(+), 6 deletions(-) diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 467b3b52b..daffcb04b 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -67,6 +67,9 @@ type Aggregator struct { // Stores the TaskResponse for each batch by batchIdentifierHash batchDataByIdentifierHash map[[32]byte]BatchData + // Stores the start time for each batch of the aggregator by task index + batchStartTimeByIdx map[uint32]time.Time + // This task index is to communicate with the local BLS // Service. // Note: In case of a reboot it can start from 0 again @@ -78,6 +81,7 @@ type Aggregator struct { // - batchCreatedBlockByIdx // - batchDataByIdentifierHash // - nextBatchIndex + // - batchStartTimeByIdx taskMutex *sync.Mutex // Mutex to protect ethereum wallet @@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error batchesIdxByIdentifierHash := make(map[[32]byte]uint32) batchDataByIdentifierHash := make(map[[32]byte]BatchData) batchCreatedBlockByIdx := make(map[uint32]uint64) + batchStartTimeByIdx := make(map[uint32]time.Time) chainioConfig := sdkclients.BuildAllConfig{ EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl, @@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error batchesIdxByIdentifierHash: batchesIdxByIdentifierHash, batchDataByIdentifierHash: batchDataByIdentifierHash, batchCreatedBlockByIdx: batchCreatedBlockByIdx, + batchStartTimeByIdx: batchStartTimeByIdx, nextBatchIndex: nextBatchIndex, taskMutex: &sync.Mutex{}, walletMutex: &sync.Mutex{}, @@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex] batchData := agg.batchDataByIdentifierHash[batchIdentifierHash] taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex] + taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex] agg.taskMutex.Unlock() agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data") @@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot) + // Only observe quorum reached if successful + agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt)) + agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex, "batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:])) @@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc agg.metrics.IncBumpedGasPriceForAggregatedResponse() agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String()) } + + startTime := time.Now() receipt, err := agg.avsWriter.SendAggregatedResponse( batchIdentifierHash, batchMerkleRoot, @@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc return nil, err } + // We only send the latency metric if the response is successul + agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime)) + agg.walletMutex.Unlock() agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:])) @@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by BatchMerkleRoot: batchMerkleRoot, SenderAddress: senderAddress, } + agg.batchStartTimeByIdx[batchIndex] = time.Now() agg.logger.Info( "Task Info added in aggregator:", "Task", batchIndex, @@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() { delete(agg.batchCreatedBlockByIdx, i) delete(agg.batchesIdentifierHashByIdx, i) delete(agg.batchDataByIdentifierHash, batchIdentifierHash) + delete(agg.batchStartTimeByIdx, i) } else { agg.logger.Warn("Task not found in maps", "taskIndex", i) } diff --git a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json index 929ff29ea..52de76921 100644 --- a/grafana/provisioning/dashboards/aligned/aggregator_batcher.json +++ b/grafana/provisioning/dashboards/aligned/aggregator_batcher.json @@ -18,7 +18,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 4, + "id": 2, "links": [], "liveNow": false, "panels": [ @@ -153,7 +153,6 @@ }, { "datasource": { - "default": true, "type": "prometheus", "uid": "prometheus" }, @@ -2451,7 +2450,32 @@ ] } }, - "overrides": [] + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "{bot=\"aggregator\", instance=\"host.docker.internal:9091\", job=\"aligned-aggregator\"}" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] }, "gridPos": { "h": 7, @@ -2625,9 +2649,203 @@ } ], "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 61 + }, + "id": 43, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": false + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "aligned_aggregator_respond_to_task_latency{bot=\"aggregator\"}", + "hide": false, + "instant": false, + "legendFormat": "Latest latency", + "range": true, + "refId": "Latency" + } + ], + "title": "Latest respond to task latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 69 + }, + "id": 44, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right", + "showLegend": false + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "aligned_aggregator_task_quorum_reached_latency{bot=\"aggregator\"}", + "hide": false, + "instant": false, + "legendFormat": "Latest latency", + "range": true, + "refId": "A" + } + ], + "title": "Latest quorum reached latency", + "type": "timeseries" } ], - "refresh": "5s", + "refresh": "", "schemaVersion": 38, "style": "dark", "tags": [], @@ -2642,6 +2860,6 @@ "timezone": "browser", "title": "System Data", "uid": "aggregator", - "version": 9, + "version": 19, "weekStart": "" -} \ No newline at end of file +} diff --git a/metrics/metrics.go b/metrics/metrics.go index dda2f7a04..ba0c187fc 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -21,6 +21,8 @@ type Metrics struct { aggregatorGasCostPaidForBatcherTotal prometheus.Gauge aggregatorNumTimesPaidForBatcher prometheus.Counter numBumpedGasPriceForAggregatedResponse prometheus.Counter + aggregatorRespondToTaskLatency prometheus.Gauge + aggregatorTaskQuorumReachedLatency prometheus.Gauge } const alignedNamespace = "aligned" @@ -59,6 +61,16 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging. Name: "respond_to_task_gas_price_bumped_count", Help: "Number of times gas price was bumped while sending aggregated response", }), + aggregatorRespondToTaskLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: alignedNamespace, + Name: "aggregator_respond_to_task_latency", + Help: "Latency of last call to respondToTask on Aligned Service Manager", + }), + aggregatorTaskQuorumReachedLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: alignedNamespace, + Name: "aggregator_task_quorum_reached_latency", + Help: "Time it takes for a task to reach quorum", + }), } } @@ -116,3 +128,11 @@ func (m *Metrics) AddAggregatorGasPaidForBatcher(value float64) { func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() { m.numBumpedGasPriceForAggregatedResponse.Inc() } + +func (m *Metrics) ObserveLatencyForRespondToTask(elapsed time.Duration) { + m.aggregatorRespondToTaskLatency.Set(elapsed.Seconds()) +} + +func (m *Metrics) ObserveTaskQuorumReached(elapsed time.Duration) { + m.aggregatorTaskQuorumReachedLatency.Set(elapsed.Seconds()) +}