Skip to content

Commit

Permalink
feat: PRT - Add the consumer address to the QoS report (#1819)
Browse files Browse the repository at this point in the history
* Add the consumer address to the QoS report

* Test fix

---------

Co-authored-by: Ran Mishael <[email protected]>
  • Loading branch information
shleikes and ranlavanet authored Dec 3, 2024
1 parent 064945d commit e9cfc7d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 29 deletions.
18 changes: 11 additions & 7 deletions protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ var (
)

type ConsumerOptimizerQoSClient struct {
consumerOrigin string
queueSender *QueueSender
optimizers map[string]OptimizerInf // keys are chain ids
consumerHostname string
consumerAddress string
queueSender *QueueSender
optimizers map[string]OptimizerInf // keys are chain ids
// keys are chain ids, values are maps with provider addresses as keys
chainIdToProviderToRelaysCount map[string]map[string]uint64
chainIdToProviderToNodeErrorsCount map[string]map[string]uint64
Expand All @@ -49,7 +50,8 @@ type OptimizerQoSReportToSend struct {
LatencyScore float64 `json:"latency_score"`
GenericScore float64 `json:"generic_score"`
ProviderAddress string `json:"provider"`
ConsumerOrigin string `json:"consumer"`
ConsumerHostname string `json:"consumer_hostname"`
ConsumerAddress string `json:"consumer_pub_address"`
ChainId string `json:"chain_id"`
NodeErrorRate float64 `json:"node_error_rate"`
Epoch uint64 `json:"epoch"`
Expand All @@ -69,14 +71,15 @@ type OptimizerInf interface {
CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport
}

func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient {
func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient {
hostname, err := os.Hostname()
if err != nil {
utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err)
hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns
}
return &ConsumerOptimizerQoSClient{
consumerOrigin: hostname,
consumerHostname: hostname,
consumerAddress: consumerAddress,
queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...),
optimizers: map[string]OptimizerInf{},
chainIdToProviderToRelaysCount: map[string]map[string]uint64{},
Expand Down Expand Up @@ -130,7 +133,8 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz
// must be called under read lock
optimizerQoSReportToSend := OptimizerQoSReportToSend{
Timestamp: time.Now(),
ConsumerOrigin: coqc.consumerOrigin,
ConsumerHostname: coqc.consumerHostname,
ConsumerAddress: coqc.consumerAddress,
SyncScore: report.SyncScore,
AvailabilityScore: report.AvailabilityScore,
LatencyScore: report.LatencyScore,
Expand Down
2 changes: 1 addition & 1 deletion protocol/provideroptimizer/provider_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) {

chainId := "dontcare"

consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient(mockHttpServer.URL, 1*time.Second)
consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1*time.Second)
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond)

providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId)
Expand Down
55 changes: 34 additions & 21 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,33 @@ type rpcConsumerStartOptions struct {
staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers
}

func getConsumerAddressAndKeys(clientCtx client.Context) (sdk.AccAddress, *secp256k1.PrivateKey, error) {
keyName, err := sigs.GetKeyName(clientCtx)
if err != nil {
return nil, nil, fmt.Errorf("failed getting key name from clientCtx: %w", err)
}

privKey, err := sigs.GetPrivKey(clientCtx, keyName)
if err != nil {
return nil, nil, fmt.Errorf("failed getting private key from key name %s: %w", keyName, err)
}

clientKey, _ := clientCtx.Keyring.Key(keyName)
pubkey, err := clientKey.GetPubKey()
if err != nil {
return nil, nil, fmt.Errorf("failed getting public key from key name %s: %w", keyName, err)
}

var consumerAddr sdk.AccAddress
err = consumerAddr.Unmarshal(pubkey.Address())
if err != nil {
return nil, nil, fmt.Errorf("failed unmarshaling public address for key %s (pubkey: %v): %w",
keyName, pubkey.Address(), err)
}

return consumerAddr, privKey, nil
}

// spawns a new RPCConsumer server with all it's processes and internals ready for communications
func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOptions) (err error) {
if common.IsTestMode(ctx) {
Expand All @@ -139,11 +166,16 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address)
consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag)

consumerAddr, privKey, err := getConsumerAddressAndKeys(options.clientCtx)
if err != nil {
utils.LavaFormatFatal("failed to get consumer address and keys", err)
}

consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting
var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient
if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen {
consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client
consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client
consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client
}
consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{
NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress,
Expand Down Expand Up @@ -179,26 +211,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
lavaChainFetcher.FetchLatestBlockNum(ctx)

lavaChainID := options.clientCtx.ChainID
keyName, err := sigs.GetKeyName(options.clientCtx)
if err != nil {
utils.LavaFormatFatal("failed getting key name from clientCtx", err)
}
privKey, err := sigs.GetPrivKey(options.clientCtx, keyName)
if err != nil {
utils.LavaFormatFatal("failed getting private key from key name", err, utils.Attribute{Key: "keyName", Value: keyName})
}
clientKey, _ := options.clientCtx.Keyring.Key(keyName)

pubkey, err := clientKey.GetPubKey()
if err != nil {
utils.LavaFormatFatal("failed getting public key from key name", err, utils.Attribute{Key: "keyName", Value: keyName})
}

var consumerAddr sdk.AccAddress
err = consumerAddr.Unmarshal(pubkey.Address())
if err != nil {
utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: pubkey.Address()})
}
// we want one provider optimizer per chain so we will store them for reuse across rpcEndpoints
chainMutexes := map[string]*sync.Mutex{}
for _, endpoint := range options.rpcEndpoints {
Expand Down

0 comments on commit e9cfc7d

Please sign in to comment.