Skip to content

Commit

Permalink
feat: PRT - add provider optimizer metrics listener (#1785)
Browse files Browse the repository at this point in the history
* WIP

* PRT - add new flags for optimizer metrics
  • Loading branch information
ranlavanet authored Nov 25, 2024
1 parent 6fff40e commit b36f548
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 17 deletions.
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (

// optimizer qos server flags
OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports
OptimizerQosListenFlag = "optimizer-qos-listen" // enable listening for qos reports on metrics endpoint
OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server
OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports
// websocket flags
Expand Down
1 change: 1 addition & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
USER_REQUEST_TYPE = "lava-user-request-type"
STATEFUL_API_HEADER = "lava-stateful-api"
REQUESTED_BLOCK_HEADER_NAME = "lava-parsed-requested-block"
LAVA_IDENTIFIED_NODE_ERROR_HEADER = "lava-identified-node-error"
LAVAP_VERSION_HEADER_NAME = "Lavap-Version"
LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid"
Expand Down
47 changes: 45 additions & 2 deletions protocol/metrics/consumer_metrics_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"encoding/json"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -64,11 +65,14 @@ type ConsumerMetricsManager struct {
relayProcessingLatencyBeforeProvider *prometheus.GaugeVec
relayProcessingLatencyAfterProvider *prometheus.GaugeVec
averageProcessingLatency map[string]*LatencyTracker
consumerOptimizerQoSClient *ConsumerOptimizerQoSClient
}

type ConsumerMetricsManagerOptions struct {
NetworkAddress string
AddMethodsApiGauge bool
NetworkAddress string
AddMethodsApiGauge bool
EnableQoSListener bool
ConsumerOptimizerQoSClient *ConsumerOptimizerQoSClient
}

func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerMetricsManager {
Expand Down Expand Up @@ -270,9 +274,24 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider,
relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider,
averageProcessingLatency: map[string]*LatencyTracker{},
consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient,
}

http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/provider_optimizer_metrics", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
reports := consumerMetricsManager.consumerOptimizerQoSClient.GetReportsToSend()
jsonData, err := json.Marshal(reports)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(jsonData)
})

overallHealthHandler := func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusOK
Expand Down Expand Up @@ -545,3 +564,27 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain
}
pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc()
}

func (pme *ConsumerMetricsManager) handleOptimizerQoS(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var report OptimizerQoSReportToSend
if err := json.NewDecoder(r.Body).Decode(&report); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

// Process the received QoS report here
utils.LavaFormatDebug("Received QoS report",
utils.LogAttr("provider", report.ProviderAddress),
utils.LogAttr("chain_id", report.ChainId),
utils.LogAttr("sync_score", report.SyncScore),
utils.LogAttr("availability_score", report.AvailabilityScore),
utils.LogAttr("latency_score", report.LatencyScore),
)

w.WriteHeader(http.StatusOK)
}
33 changes: 23 additions & 10 deletions protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ConsumerOptimizerQoSClient struct {
chainIdToProviderToEpochToStake map[string]map[string]map[uint64]int64 // third key is epoch
currentEpoch atomic.Uint64
lock sync.RWMutex
reportsToSend []OptimizerQoSReportToSend
}

type OptimizerQoSReport struct {
Expand All @@ -41,7 +42,7 @@ type OptimizerQoSReport struct {
EntryIndex int
}

type optimizerQoSReportToSend struct {
type OptimizerQoSReportToSend struct {
Timestamp time.Time `json:"timestamp"`
SyncScore float64 `json:"sync_score"`
AvailabilityScore float64 `json:"availability_score"`
Expand All @@ -56,7 +57,7 @@ type optimizerQoSReportToSend struct {
EntryIndex int `json:"entry_index"`
}

func (oqosr optimizerQoSReportToSend) String() string {
func (oqosr OptimizerQoSReportToSend) String() string {
bytes, err := json.Marshal(oqosr)
if err != nil {
return ""
Expand All @@ -74,7 +75,6 @@ func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Dura
utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err)
hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns
}

return &ConsumerOptimizerQoSClient{
consumerOrigin: hostname,
queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...),
Expand Down Expand Up @@ -126,10 +126,9 @@ func (coqc *ConsumerOptimizerQoSClient) calculateNodeErrorRate(chainId, provider
return 0
}

func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) {
func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) OptimizerQoSReportToSend {
// must be called under read lock

optimizerQoSReportToSend := optimizerQoSReportToSend{
optimizerQoSReportToSend := OptimizerQoSReportToSend{
Timestamp: time.Now(),
ConsumerOrigin: coqc.consumerOrigin,
SyncScore: report.SyncScore,
Expand All @@ -145,9 +144,10 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz
}

coqc.queueSender.appendQueue(optimizerQoSReportToSend)
return optimizerQoSReportToSend
}

func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {
func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() []OptimizerQoSReportToSend {
coqc.lock.RLock() // we only read from the maps here
defer coqc.lock.RUnlock()

Expand All @@ -156,7 +156,7 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {
requestedBlock := spectypes.LATEST_BLOCK

currentEpoch := coqc.currentEpoch.Load()

reportsToSend := []OptimizerQoSReportToSend{}
for chainId, optimizer := range coqc.optimizers {
providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId]
if !ok {
Expand All @@ -165,9 +165,22 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {

reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock)
for _, report := range reports {
coqc.appendOptimizerQoSReport(report, chainId, currentEpoch)
reportsToSend = append(reportsToSend, coqc.appendOptimizerQoSReport(report, chainId, currentEpoch))
}
}
return reportsToSend
}

func (coqc *ConsumerOptimizerQoSClient) SetReportsToSend(reports []OptimizerQoSReportToSend) {
coqc.lock.Lock()
defer coqc.lock.Unlock()
coqc.reportsToSend = reports
}

func (coqc *ConsumerOptimizerQoSClient) GetReportsToSend() []OptimizerQoSReportToSend {
coqc.lock.RLock()
defer coqc.lock.RUnlock()
return coqc.reportsToSend
}

func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) {
Expand All @@ -183,7 +196,7 @@ func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx
utils.LavaFormatTrace("ConsumerOptimizerQoSClient context done")
return
case <-time.After(samplingInterval):
coqc.getReportsFromOptimizers()
coqc.SetReportsToSend(coqc.getReportsFromOptimizers())
}
}
}()
Expand Down
16 changes: 12 additions & 4 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type AnalyticsServerAddresses struct {
RelayServerAddress string
ReportsAddressFlag string
OptimizerQoSAddress string
OptimizerQoSListen bool
}
type RPCConsumer struct {
consumerStateTracker ConsumerStateTrackerInf
Expand Down Expand Up @@ -133,14 +134,19 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
}
options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address)
consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag)
consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics}) // start up prometheus metrics
consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting

consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting
var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient
if options.analyticsServerAddresses.OptimizerQoSAddress != "" {
if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen {
consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client
}

consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{
NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress,
AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics,
EnableQoSListener: options.analyticsServerAddresses.OptimizerQoSListen,
ConsumerOptimizerQoSClient: consumerOptimizerQoSClient,
}) // start up prometheus metrics
rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageServeManager, consumerOptimizerQoSClient)
if err != nil {
utils.LavaFormatFatal("failed creating RPCConsumer logs", err)
Expand Down Expand Up @@ -557,6 +563,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
RelayServerAddress: viper.GetString(metrics.RelayServerFlagName),
ReportsAddressFlag: viper.GetString(reportsSendBEAddress),
OptimizerQoSAddress: viper.GetString(common.OptimizerQosServerAddressFlag),
OptimizerQoSListen: viper.GetBool(common.OptimizerQosListenFlag),
}

var refererData *chainlib.RefererData
Expand Down Expand Up @@ -646,6 +653,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().IntVar(&provideroptimizer.OptimizerNumTiers, common.SetProviderOptimizerNumberOfTiersToCreate, 4, "set the number of groups to create, default is 4")
// optimizer qos reports
cmdRPCConsumer.Flags().String(common.OptimizerQosServerAddressFlag, "", "address to send optimizer qos reports to")
cmdRPCConsumer.Flags().Bool(common.OptimizerQosListenFlag, false, "enable listening for optimizer qos reports on metrics endpoint i.e GET -> localhost:7779/provider_optimizer_metrics")
cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerPushInterval, common.OptimizerQosServerPushIntervalFlag, time.Minute*5, "interval to push optimizer qos reports")
cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports")
cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited")
Expand Down
6 changes: 6 additions & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,12 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
if debugRelays {
metadataReply = append(metadataReply,
pairingtypes.Metadata{
Name: common.REQUESTED_BLOCK_HEADER_NAME,
Value: strconv.FormatInt(protocolMessage.RelayPrivateData().GetRequestBlock(), 10),
})

routerKey := lavasession.NewRouterKeyFromExtensions(protocolMessage.GetExtensions())
erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders(routerKey)
if len(erroredProviders) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion scripts/pre_setups/init_lava_only_with_node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ wait_next_block

screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25
$EXTRA_PORTAL_FLAGS --geolocation 1 --optimizer-qos-listen --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25

echo "--- setting up screens done ---"
screen -ls

0 comments on commit b36f548

Please sign in to comment.