Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - add provider optimizer metrics listener #1785

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (

// optimizer qos server flags
OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports
OptimizerQosListenFlag = "optimizer-qos-listen" // enable listening for qos reports on metrics endpoint
OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server
OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports
// websocket flags
Expand Down
1 change: 1 addition & 0 deletions protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
USER_REQUEST_TYPE = "lava-user-request-type"
STATEFUL_API_HEADER = "lava-stateful-api"
REQUESTED_BLOCK_HEADER_NAME = "lava-parsed-requested-block"
LAVA_IDENTIFIED_NODE_ERROR_HEADER = "lava-identified-node-error"
LAVAP_VERSION_HEADER_NAME = "Lavap-Version"
LAVA_CONSUMER_PROCESS_GUID = "lava-consumer-process-guid"
Expand Down
47 changes: 45 additions & 2 deletions protocol/metrics/consumer_metrics_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"encoding/json"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -64,11 +65,14 @@ type ConsumerMetricsManager struct {
relayProcessingLatencyBeforeProvider *prometheus.GaugeVec
relayProcessingLatencyAfterProvider *prometheus.GaugeVec
averageProcessingLatency map[string]*LatencyTracker
consumerOptimizerQoSClient *ConsumerOptimizerQoSClient
}

type ConsumerMetricsManagerOptions struct {
NetworkAddress string
AddMethodsApiGauge bool
NetworkAddress string
AddMethodsApiGauge bool
EnableQoSListener bool
ConsumerOptimizerQoSClient *ConsumerOptimizerQoSClient
}

func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerMetricsManager {
Expand Down Expand Up @@ -270,9 +274,24 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider,
relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider,
averageProcessingLatency: map[string]*LatencyTracker{},
consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient,
}

http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/provider_optimizer_metrics", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
reports := consumerMetricsManager.consumerOptimizerQoSClient.GetReportsToSend()
jsonData, err := json.Marshal(reports)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(jsonData)
})

overallHealthHandler := func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusOK
Expand Down Expand Up @@ -545,3 +564,27 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain
}
pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc()
}

func (pme *ConsumerMetricsManager) handleOptimizerQoS(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var report OptimizerQoSReportToSend
if err := json.NewDecoder(r.Body).Decode(&report); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}

// Process the received QoS report here
utils.LavaFormatDebug("Received QoS report",
utils.LogAttr("provider", report.ProviderAddress),
utils.LogAttr("chain_id", report.ChainId),
utils.LogAttr("sync_score", report.SyncScore),
utils.LogAttr("availability_score", report.AvailabilityScore),
utils.LogAttr("latency_score", report.LatencyScore),
)

w.WriteHeader(http.StatusOK)
}
33 changes: 23 additions & 10 deletions protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ConsumerOptimizerQoSClient struct {
chainIdToProviderToEpochToStake map[string]map[string]map[uint64]int64 // third key is epoch
currentEpoch atomic.Uint64
lock sync.RWMutex
reportsToSend []OptimizerQoSReportToSend
}

type OptimizerQoSReport struct {
Expand All @@ -41,7 +42,7 @@ type OptimizerQoSReport struct {
EntryIndex int
}

type optimizerQoSReportToSend struct {
type OptimizerQoSReportToSend struct {
Timestamp time.Time `json:"timestamp"`
SyncScore float64 `json:"sync_score"`
AvailabilityScore float64 `json:"availability_score"`
Expand All @@ -56,7 +57,7 @@ type optimizerQoSReportToSend struct {
EntryIndex int `json:"entry_index"`
}

func (oqosr optimizerQoSReportToSend) String() string {
func (oqosr OptimizerQoSReportToSend) String() string {
bytes, err := json.Marshal(oqosr)
if err != nil {
return ""
Expand All @@ -74,7 +75,6 @@ func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Dura
utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err)
hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns
}

return &ConsumerOptimizerQoSClient{
consumerOrigin: hostname,
queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...),
Expand Down Expand Up @@ -126,10 +126,9 @@ func (coqc *ConsumerOptimizerQoSClient) calculateNodeErrorRate(chainId, provider
return 0
}

func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) {
func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) OptimizerQoSReportToSend {
// must be called under read lock

optimizerQoSReportToSend := optimizerQoSReportToSend{
optimizerQoSReportToSend := OptimizerQoSReportToSend{
Timestamp: time.Now(),
ConsumerOrigin: coqc.consumerOrigin,
SyncScore: report.SyncScore,
Expand All @@ -145,9 +144,10 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz
}

coqc.queueSender.appendQueue(optimizerQoSReportToSend)
return optimizerQoSReportToSend
}

func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {
func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() []OptimizerQoSReportToSend {
coqc.lock.RLock() // we only read from the maps here
defer coqc.lock.RUnlock()

Expand All @@ -156,7 +156,7 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {
requestedBlock := spectypes.LATEST_BLOCK

currentEpoch := coqc.currentEpoch.Load()

reportsToSend := []OptimizerQoSReportToSend{}
for chainId, optimizer := range coqc.optimizers {
providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId]
if !ok {
Expand All @@ -165,9 +165,22 @@ func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {

reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock)
for _, report := range reports {
coqc.appendOptimizerQoSReport(report, chainId, currentEpoch)
reportsToSend = append(reportsToSend, coqc.appendOptimizerQoSReport(report, chainId, currentEpoch))
}
}
return reportsToSend
}

func (coqc *ConsumerOptimizerQoSClient) SetReportsToSend(reports []OptimizerQoSReportToSend) {
coqc.lock.Lock()
defer coqc.lock.Unlock()
coqc.reportsToSend = reports
}

func (coqc *ConsumerOptimizerQoSClient) GetReportsToSend() []OptimizerQoSReportToSend {
coqc.lock.RLock()
defer coqc.lock.RUnlock()
return coqc.reportsToSend
}

func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) {
Expand All @@ -183,7 +196,7 @@ func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx
utils.LavaFormatTrace("ConsumerOptimizerQoSClient context done")
return
case <-time.After(samplingInterval):
coqc.getReportsFromOptimizers()
coqc.SetReportsToSend(coqc.getReportsFromOptimizers())
}
}
}()
Expand Down
16 changes: 12 additions & 4 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type AnalyticsServerAddresses struct {
RelayServerAddress string
ReportsAddressFlag string
OptimizerQoSAddress string
OptimizerQoSListen bool
}
type RPCConsumer struct {
consumerStateTracker ConsumerStateTrackerInf
Expand Down Expand Up @@ -133,14 +134,19 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
}
options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address)
consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag)
consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics}) // start up prometheus metrics
consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting

consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting
var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient
if options.analyticsServerAddresses.OptimizerQoSAddress != "" {
if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen {
consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client
}

consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{
NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress,
AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics,
EnableQoSListener: options.analyticsServerAddresses.OptimizerQoSListen,
ConsumerOptimizerQoSClient: consumerOptimizerQoSClient,
}) // start up prometheus metrics
rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageServeManager, consumerOptimizerQoSClient)
if err != nil {
utils.LavaFormatFatal("failed creating RPCConsumer logs", err)
Expand Down Expand Up @@ -557,6 +563,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
RelayServerAddress: viper.GetString(metrics.RelayServerFlagName),
ReportsAddressFlag: viper.GetString(reportsSendBEAddress),
OptimizerQoSAddress: viper.GetString(common.OptimizerQosServerAddressFlag),
OptimizerQoSListen: viper.GetBool(common.OptimizerQosListenFlag),
}

var refererData *chainlib.RefererData
Expand Down Expand Up @@ -646,6 +653,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().IntVar(&provideroptimizer.OptimizerNumTiers, common.SetProviderOptimizerNumberOfTiersToCreate, 4, "set the number of groups to create, default is 4")
// optimizer qos reports
cmdRPCConsumer.Flags().String(common.OptimizerQosServerAddressFlag, "", "address to send optimizer qos reports to")
cmdRPCConsumer.Flags().Bool(common.OptimizerQosListenFlag, false, "enable listening for optimizer qos reports on metrics endpoint i.e GET -> localhost:7779/provider_optimizer_metrics")
cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerPushInterval, common.OptimizerQosServerPushIntervalFlag, time.Minute*5, "interval to push optimizer qos reports")
cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports")
cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited")
Expand Down
6 changes: 6 additions & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,12 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
directiveHeaders := protocolMessage.GetDirectiveHeaders()
_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
if debugRelays {
metadataReply = append(metadataReply,
pairingtypes.Metadata{
Name: common.REQUESTED_BLOCK_HEADER_NAME,
Value: strconv.FormatInt(protocolMessage.RelayPrivateData().GetRequestBlock(), 10),
})

routerKey := lavasession.NewRouterKeyFromExtensions(protocolMessage.GetExtensions())
erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders(routerKey)
if len(erroredProviders) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion scripts/pre_setups/init_lava_only_with_node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ wait_next_block

screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25
$EXTRA_PORTAL_FLAGS --geolocation 1 --optimizer-qos-listen --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25

echo "--- setting up screens done ---"
screen -ls
Loading